AgentPDF / nodes /embeddings_creator.py
rwayz's picture
Deploy
6b29104
"""
Nó de criação de embeddings e vector store para o AgentPDF.
Este nó é responsável por gerar embeddings dos chunks de texto
e criar um vector store FAISS para recuperação eficiente.
"""
from typing import Dict, Any, List
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.runnables import RunnableConfig
from langchain_core.documents import Document
from agents.state import PDFState, ProcessingStatus
from utils.config import Config, get_openai_api_key
from utils.logger import log_node_execution, main_logger
def embeddings_creation_node(state: PDFState, config: RunnableConfig) -> Dict[str, Any]:
"""
Nó responsável por criar embeddings e vector store.
Este nó:
1. Recebe os chunks de texto processados
2. Gera embeddings usando OpenAI
3. Cria um vector store FAISS
4. Atualiza o estado com o vector store
Args:
state: Estado atual do grafo contendo os chunks
config: Configuração do LangGraph
Returns:
Dict[str, Any]: Atualizações para o estado
"""
log_node_execution("EMBEDDINGS_CREATOR", "START", "Iniciando criação de embeddings")
try:
# Verifica se há chunks para processar
pdf_chunks = state.get("pdf_chunks")
if not pdf_chunks:
error_msg = "Nenhum chunk encontrado para criar embeddings"
log_node_execution("EMBEDDINGS_CREATOR", "ERROR", error_msg)
return {
"processing_status": ProcessingStatus.ERROR,
"error_message": error_msg
}
# Verifica se a API key está configurada
api_key = get_openai_api_key()
if not api_key:
error_msg = "Chave da API OpenAI não configurada"
log_node_execution("EMBEDDINGS_CREATOR", "ERROR", error_msg)
return {
"processing_status": ProcessingStatus.ERROR,
"error_message": error_msg
}
log_node_execution(
"EMBEDDINGS_CREATOR",
"PROCESSING",
f"Criando embeddings para {len(pdf_chunks)} chunks"
)
# Cria o modelo de embeddings
embeddings_model = create_embeddings_model()
# Converte chunks em documentos
documents = create_documents_from_chunks(pdf_chunks)
# Cria o vector store
vector_store = create_vector_store(documents, embeddings_model)
log_node_execution(
"EMBEDDINGS_CREATOR",
"SUCCESS",
f"Vector store criado com {len(documents)} documentos"
)
return {
"vector_store": vector_store,
"embeddings_created": True,
"processing_status": ProcessingStatus.IDLE, # Pronto para perguntas
"error_message": None
}
except Exception as e:
error_msg = f"Erro ao criar embeddings: {str(e)}"
log_node_execution("EMBEDDINGS_CREATOR", "ERROR", error_msg)
main_logger.exception("Erro detalhado na criação de embeddings:")
return {
"processing_status": ProcessingStatus.ERROR,
"error_message": error_msg,
"embeddings_created": False
}
def create_embeddings_model() -> OpenAIEmbeddings:
"""
Cria e configura o modelo de embeddings OpenAI.
Returns:
OpenAIEmbeddings: Modelo de embeddings configurado
"""
try:
embeddings = OpenAIEmbeddings(
openai_api_key=get_openai_api_key(),
model="text-embedding-3-small", # Modelo mais eficiente
chunk_size=1000, # Tamanho do chunk para embeddings
max_retries=3,
timeout=30
)
main_logger.debug("Modelo de embeddings OpenAI criado com sucesso")
return embeddings
except Exception as e:
main_logger.error(f"Erro ao criar modelo de embeddings: {e}")
raise
def create_documents_from_chunks(chunks: List[str]) -> List[Document]:
"""
Converte chunks de texto em objetos Document do LangChain.
Args:
chunks: Lista de chunks de texto
Returns:
List[Document]: Lista de documentos LangChain
"""
documents = []
for i, chunk in enumerate(chunks):
# Cria metadados para cada documento
metadata = {
"chunk_id": i,
"chunk_size": len(chunk),
"source": "pdf_upload",
"chunk_index": i
}
# Cria o documento
doc = Document(
page_content=chunk,
metadata=metadata
)
documents.append(doc)
main_logger.debug(f"Criados {len(documents)} documentos a partir dos chunks")
return documents
def create_vector_store(documents: List[Document], embeddings_model: OpenAIEmbeddings) -> FAISS:
"""
Cria um vector store FAISS a partir dos documentos.
Args:
documents: Lista de documentos
embeddings_model: Modelo de embeddings
Returns:
FAISS: Vector store criado
"""
try:
main_logger.info("Criando vector store FAISS...")
# Cria o vector store
vector_store = FAISS.from_documents(
documents=documents,
embedding=embeddings_model
)
main_logger.info(f"Vector store FAISS criado com {len(documents)} documentos")
# Log estatísticas
log_vector_store_stats(vector_store, documents)
return vector_store
except Exception as e:
main_logger.error(f"Erro ao criar vector store FAISS: {e}")
raise
def log_vector_store_stats(vector_store: FAISS, documents: List[Document]):
"""
Registra estatísticas do vector store criado.
Args:
vector_store: Vector store FAISS
documents: Lista de documentos
"""
try:
# Estatísticas básicas
total_docs = len(documents)
total_chars = sum(len(doc.page_content) for doc in documents)
avg_doc_size = total_chars / total_docs if total_docs > 0 else 0
main_logger.info(f"📊 Estatísticas do Vector Store:")
main_logger.info(f" • Total de documentos: {total_docs}")
main_logger.info(f" • Total de caracteres: {total_chars:,}")
main_logger.info(f" • Tamanho médio por documento: {avg_doc_size:.0f} caracteres")
# Testa uma busca simples para verificar funcionamento
test_results = vector_store.similarity_search("teste", k=1)
main_logger.debug(f"Teste de busca retornou {len(test_results)} resultado(s)")
except Exception as e:
main_logger.warning(f"Erro ao calcular estatísticas do vector store: {e}")
def test_vector_store(vector_store: FAISS, test_query: str = "informação") -> bool:
"""
Testa o funcionamento do vector store.
Args:
vector_store: Vector store para testar
test_query: Query de teste
Returns:
bool: True se o teste passou
"""
try:
# Testa busca por similaridade
results = vector_store.similarity_search(test_query, k=3)
if not results:
main_logger.warning("Vector store não retornou resultados para query de teste")
return False
# Testa busca com score
results_with_score = vector_store.similarity_search_with_score(test_query, k=3)
if not results_with_score:
main_logger.warning("Vector store não retornou scores para query de teste")
return False
main_logger.debug(f"Teste do vector store passou: {len(results)} resultados encontrados")
return True
except Exception as e:
main_logger.error(f"Erro no teste do vector store: {e}")
return False
def optimize_vector_store(vector_store: FAISS) -> FAISS:
"""
Otimiza o vector store para melhor performance.
Args:
vector_store: Vector store original
Returns:
FAISS: Vector store otimizado
"""
try:
# Para FAISS, podemos otimizar o índice
# Isso é especialmente útil para grandes volumes de dados
main_logger.debug("Otimizando vector store FAISS...")
# O FAISS já é otimizado por padrão para volumes pequenos/médios
# Para volumes maiores, poderíamos usar índices mais sofisticados
return vector_store
except Exception as e:
main_logger.warning(f"Erro na otimização do vector store: {e}")
return vector_store # Retorna o original se a otimização falhar
def get_vector_store_info(vector_store: FAISS) -> Dict[str, Any]:
"""
Obtém informações sobre o vector store.
Args:
vector_store: Vector store FAISS
Returns:
Dict[str, Any]: Informações do vector store
"""
try:
# Informações básicas do FAISS
index = vector_store.index
return {
"total_vectors": index.ntotal,
"vector_dimension": index.d,
"index_type": type(index).__name__,
"is_trained": index.is_trained if hasattr(index, 'is_trained') else True
}
except Exception as e:
main_logger.warning(f"Erro ao obter informações do vector store: {e}")
return {
"total_vectors": 0,
"vector_dimension": 0,
"index_type": "unknown",
"is_trained": False
}