File size: 9,711 Bytes
6b29104 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 | """
Nó de criação de embeddings e vector store para o AgentPDF.
Este nó é responsável por gerar embeddings dos chunks de texto
e criar um vector store FAISS para recuperação eficiente.
"""
from typing import Dict, Any, List
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.runnables import RunnableConfig
from langchain_core.documents import Document
from agents.state import PDFState, ProcessingStatus
from utils.config import Config, get_openai_api_key
from utils.logger import log_node_execution, main_logger
def embeddings_creation_node(state: PDFState, config: RunnableConfig) -> Dict[str, Any]:
"""
Nó responsável por criar embeddings e vector store.
Este nó:
1. Recebe os chunks de texto processados
2. Gera embeddings usando OpenAI
3. Cria um vector store FAISS
4. Atualiza o estado com o vector store
Args:
state: Estado atual do grafo contendo os chunks
config: Configuração do LangGraph
Returns:
Dict[str, Any]: Atualizações para o estado
"""
log_node_execution("EMBEDDINGS_CREATOR", "START", "Iniciando criação de embeddings")
try:
# Verifica se há chunks para processar
pdf_chunks = state.get("pdf_chunks")
if not pdf_chunks:
error_msg = "Nenhum chunk encontrado para criar embeddings"
log_node_execution("EMBEDDINGS_CREATOR", "ERROR", error_msg)
return {
"processing_status": ProcessingStatus.ERROR,
"error_message": error_msg
}
# Verifica se a API key está configurada
api_key = get_openai_api_key()
if not api_key:
error_msg = "Chave da API OpenAI não configurada"
log_node_execution("EMBEDDINGS_CREATOR", "ERROR", error_msg)
return {
"processing_status": ProcessingStatus.ERROR,
"error_message": error_msg
}
log_node_execution(
"EMBEDDINGS_CREATOR",
"PROCESSING",
f"Criando embeddings para {len(pdf_chunks)} chunks"
)
# Cria o modelo de embeddings
embeddings_model = create_embeddings_model()
# Converte chunks em documentos
documents = create_documents_from_chunks(pdf_chunks)
# Cria o vector store
vector_store = create_vector_store(documents, embeddings_model)
log_node_execution(
"EMBEDDINGS_CREATOR",
"SUCCESS",
f"Vector store criado com {len(documents)} documentos"
)
return {
"vector_store": vector_store,
"embeddings_created": True,
"processing_status": ProcessingStatus.IDLE, # Pronto para perguntas
"error_message": None
}
except Exception as e:
error_msg = f"Erro ao criar embeddings: {str(e)}"
log_node_execution("EMBEDDINGS_CREATOR", "ERROR", error_msg)
main_logger.exception("Erro detalhado na criação de embeddings:")
return {
"processing_status": ProcessingStatus.ERROR,
"error_message": error_msg,
"embeddings_created": False
}
def create_embeddings_model() -> OpenAIEmbeddings:
"""
Cria e configura o modelo de embeddings OpenAI.
Returns:
OpenAIEmbeddings: Modelo de embeddings configurado
"""
try:
embeddings = OpenAIEmbeddings(
openai_api_key=get_openai_api_key(),
model="text-embedding-3-small", # Modelo mais eficiente
chunk_size=1000, # Tamanho do chunk para embeddings
max_retries=3,
timeout=30
)
main_logger.debug("Modelo de embeddings OpenAI criado com sucesso")
return embeddings
except Exception as e:
main_logger.error(f"Erro ao criar modelo de embeddings: {e}")
raise
def create_documents_from_chunks(chunks: List[str]) -> List[Document]:
"""
Converte chunks de texto em objetos Document do LangChain.
Args:
chunks: Lista de chunks de texto
Returns:
List[Document]: Lista de documentos LangChain
"""
documents = []
for i, chunk in enumerate(chunks):
# Cria metadados para cada documento
metadata = {
"chunk_id": i,
"chunk_size": len(chunk),
"source": "pdf_upload",
"chunk_index": i
}
# Cria o documento
doc = Document(
page_content=chunk,
metadata=metadata
)
documents.append(doc)
main_logger.debug(f"Criados {len(documents)} documentos a partir dos chunks")
return documents
def create_vector_store(documents: List[Document], embeddings_model: OpenAIEmbeddings) -> FAISS:
"""
Cria um vector store FAISS a partir dos documentos.
Args:
documents: Lista de documentos
embeddings_model: Modelo de embeddings
Returns:
FAISS: Vector store criado
"""
try:
main_logger.info("Criando vector store FAISS...")
# Cria o vector store
vector_store = FAISS.from_documents(
documents=documents,
embedding=embeddings_model
)
main_logger.info(f"Vector store FAISS criado com {len(documents)} documentos")
# Log estatísticas
log_vector_store_stats(vector_store, documents)
return vector_store
except Exception as e:
main_logger.error(f"Erro ao criar vector store FAISS: {e}")
raise
def log_vector_store_stats(vector_store: FAISS, documents: List[Document]):
"""
Registra estatísticas do vector store criado.
Args:
vector_store: Vector store FAISS
documents: Lista de documentos
"""
try:
# Estatísticas básicas
total_docs = len(documents)
total_chars = sum(len(doc.page_content) for doc in documents)
avg_doc_size = total_chars / total_docs if total_docs > 0 else 0
main_logger.info(f"📊 Estatísticas do Vector Store:")
main_logger.info(f" • Total de documentos: {total_docs}")
main_logger.info(f" • Total de caracteres: {total_chars:,}")
main_logger.info(f" • Tamanho médio por documento: {avg_doc_size:.0f} caracteres")
# Testa uma busca simples para verificar funcionamento
test_results = vector_store.similarity_search("teste", k=1)
main_logger.debug(f"Teste de busca retornou {len(test_results)} resultado(s)")
except Exception as e:
main_logger.warning(f"Erro ao calcular estatísticas do vector store: {e}")
def test_vector_store(vector_store: FAISS, test_query: str = "informação") -> bool:
"""
Testa o funcionamento do vector store.
Args:
vector_store: Vector store para testar
test_query: Query de teste
Returns:
bool: True se o teste passou
"""
try:
# Testa busca por similaridade
results = vector_store.similarity_search(test_query, k=3)
if not results:
main_logger.warning("Vector store não retornou resultados para query de teste")
return False
# Testa busca com score
results_with_score = vector_store.similarity_search_with_score(test_query, k=3)
if not results_with_score:
main_logger.warning("Vector store não retornou scores para query de teste")
return False
main_logger.debug(f"Teste do vector store passou: {len(results)} resultados encontrados")
return True
except Exception as e:
main_logger.error(f"Erro no teste do vector store: {e}")
return False
def optimize_vector_store(vector_store: FAISS) -> FAISS:
"""
Otimiza o vector store para melhor performance.
Args:
vector_store: Vector store original
Returns:
FAISS: Vector store otimizado
"""
try:
# Para FAISS, podemos otimizar o índice
# Isso é especialmente útil para grandes volumes de dados
main_logger.debug("Otimizando vector store FAISS...")
# O FAISS já é otimizado por padrão para volumes pequenos/médios
# Para volumes maiores, poderíamos usar índices mais sofisticados
return vector_store
except Exception as e:
main_logger.warning(f"Erro na otimização do vector store: {e}")
return vector_store # Retorna o original se a otimização falhar
def get_vector_store_info(vector_store: FAISS) -> Dict[str, Any]:
"""
Obtém informações sobre o vector store.
Args:
vector_store: Vector store FAISS
Returns:
Dict[str, Any]: Informações do vector store
"""
try:
# Informações básicas do FAISS
index = vector_store.index
return {
"total_vectors": index.ntotal,
"vector_dimension": index.d,
"index_type": type(index).__name__,
"is_trained": index.is_trained if hasattr(index, 'is_trained') else True
}
except Exception as e:
main_logger.warning(f"Erro ao obter informações do vector store: {e}")
return {
"total_vectors": 0,
"vector_dimension": 0,
"index_type": "unknown",
"is_trained": False
}
|