File size: 9,711 Bytes
6b29104
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
"""
Nó de criação de embeddings e vector store para o AgentPDF.

Este nó é responsável por gerar embeddings dos chunks de texto
e criar um vector store FAISS para recuperação eficiente.
"""

from typing import Dict, Any, List
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_core.runnables import RunnableConfig
from langchain_core.documents import Document

from agents.state import PDFState, ProcessingStatus
from utils.config import Config, get_openai_api_key
from utils.logger import log_node_execution, main_logger


def embeddings_creation_node(state: PDFState, config: RunnableConfig) -> Dict[str, Any]:
    """
    Nó responsável por criar embeddings e vector store.
    
    Este nó:
    1. Recebe os chunks de texto processados
    2. Gera embeddings usando OpenAI
    3. Cria um vector store FAISS
    4. Atualiza o estado com o vector store
    
    Args:
        state: Estado atual do grafo contendo os chunks
        config: Configuração do LangGraph
    
    Returns:
        Dict[str, Any]: Atualizações para o estado
    """
    log_node_execution("EMBEDDINGS_CREATOR", "START", "Iniciando criação de embeddings")
    
    try:
        # Verifica se há chunks para processar
        pdf_chunks = state.get("pdf_chunks")
        if not pdf_chunks:
            error_msg = "Nenhum chunk encontrado para criar embeddings"
            log_node_execution("EMBEDDINGS_CREATOR", "ERROR", error_msg)
            return {
                "processing_status": ProcessingStatus.ERROR,
                "error_message": error_msg
            }
        
        # Verifica se a API key está configurada
        api_key = get_openai_api_key()
        if not api_key:
            error_msg = "Chave da API OpenAI não configurada"
            log_node_execution("EMBEDDINGS_CREATOR", "ERROR", error_msg)
            return {
                "processing_status": ProcessingStatus.ERROR,
                "error_message": error_msg
            }
        
        log_node_execution(
            "EMBEDDINGS_CREATOR", 
            "PROCESSING", 
            f"Criando embeddings para {len(pdf_chunks)} chunks"
        )
        
        # Cria o modelo de embeddings
        embeddings_model = create_embeddings_model()
        
        # Converte chunks em documentos
        documents = create_documents_from_chunks(pdf_chunks)
        
        # Cria o vector store
        vector_store = create_vector_store(documents, embeddings_model)
        
        log_node_execution(
            "EMBEDDINGS_CREATOR", 
            "SUCCESS", 
            f"Vector store criado com {len(documents)} documentos"
        )
        
        return {
            "vector_store": vector_store,
            "embeddings_created": True,
            "processing_status": ProcessingStatus.IDLE,  # Pronto para perguntas
            "error_message": None
        }
        
    except Exception as e:
        error_msg = f"Erro ao criar embeddings: {str(e)}"
        log_node_execution("EMBEDDINGS_CREATOR", "ERROR", error_msg)
        main_logger.exception("Erro detalhado na criação de embeddings:")
        
        return {
            "processing_status": ProcessingStatus.ERROR,
            "error_message": error_msg,
            "embeddings_created": False
        }


def create_embeddings_model() -> OpenAIEmbeddings:
    """
    Cria e configura o modelo de embeddings OpenAI.
    
    Returns:
        OpenAIEmbeddings: Modelo de embeddings configurado
    """
    try:
        embeddings = OpenAIEmbeddings(
            openai_api_key=get_openai_api_key(),
            model="text-embedding-3-small",  # Modelo mais eficiente
            chunk_size=1000,  # Tamanho do chunk para embeddings
            max_retries=3,
            timeout=30
        )
        
        main_logger.debug("Modelo de embeddings OpenAI criado com sucesso")
        return embeddings
        
    except Exception as e:
        main_logger.error(f"Erro ao criar modelo de embeddings: {e}")
        raise


def create_documents_from_chunks(chunks: List[str]) -> List[Document]:
    """
    Converte chunks de texto em objetos Document do LangChain.
    
    Args:
        chunks: Lista de chunks de texto
    
    Returns:
        List[Document]: Lista de documentos LangChain
    """
    documents = []
    
    for i, chunk in enumerate(chunks):
        # Cria metadados para cada documento
        metadata = {
            "chunk_id": i,
            "chunk_size": len(chunk),
            "source": "pdf_upload",
            "chunk_index": i
        }
        
        # Cria o documento
        doc = Document(
            page_content=chunk,
            metadata=metadata
        )
        
        documents.append(doc)
    
    main_logger.debug(f"Criados {len(documents)} documentos a partir dos chunks")
    return documents


def create_vector_store(documents: List[Document], embeddings_model: OpenAIEmbeddings) -> FAISS:
    """
    Cria um vector store FAISS a partir dos documentos.
    
    Args:
        documents: Lista de documentos
        embeddings_model: Modelo de embeddings
    
    Returns:
        FAISS: Vector store criado
    """
    try:
        main_logger.info("Criando vector store FAISS...")
        
        # Cria o vector store
        vector_store = FAISS.from_documents(
            documents=documents,
            embedding=embeddings_model
        )
        
        main_logger.info(f"Vector store FAISS criado com {len(documents)} documentos")
        
        # Log estatísticas
        log_vector_store_stats(vector_store, documents)
        
        return vector_store
        
    except Exception as e:
        main_logger.error(f"Erro ao criar vector store FAISS: {e}")
        raise


def log_vector_store_stats(vector_store: FAISS, documents: List[Document]):
    """
    Registra estatísticas do vector store criado.
    
    Args:
        vector_store: Vector store FAISS
        documents: Lista de documentos
    """
    try:
        # Estatísticas básicas
        total_docs = len(documents)
        total_chars = sum(len(doc.page_content) for doc in documents)
        avg_doc_size = total_chars / total_docs if total_docs > 0 else 0
        
        main_logger.info(f"📊 Estatísticas do Vector Store:")
        main_logger.info(f"   • Total de documentos: {total_docs}")
        main_logger.info(f"   • Total de caracteres: {total_chars:,}")
        main_logger.info(f"   • Tamanho médio por documento: {avg_doc_size:.0f} caracteres")
        
        # Testa uma busca simples para verificar funcionamento
        test_results = vector_store.similarity_search("teste", k=1)
        main_logger.debug(f"Teste de busca retornou {len(test_results)} resultado(s)")
        
    except Exception as e:
        main_logger.warning(f"Erro ao calcular estatísticas do vector store: {e}")


def test_vector_store(vector_store: FAISS, test_query: str = "informação") -> bool:
    """
    Testa o funcionamento do vector store.
    
    Args:
        vector_store: Vector store para testar
        test_query: Query de teste
    
    Returns:
        bool: True se o teste passou
    """
    try:
        # Testa busca por similaridade
        results = vector_store.similarity_search(test_query, k=3)
        
        if not results:
            main_logger.warning("Vector store não retornou resultados para query de teste")
            return False
        
        # Testa busca com score
        results_with_score = vector_store.similarity_search_with_score(test_query, k=3)
        
        if not results_with_score:
            main_logger.warning("Vector store não retornou scores para query de teste")
            return False
        
        main_logger.debug(f"Teste do vector store passou: {len(results)} resultados encontrados")
        return True
        
    except Exception as e:
        main_logger.error(f"Erro no teste do vector store: {e}")
        return False


def optimize_vector_store(vector_store: FAISS) -> FAISS:
    """
    Otimiza o vector store para melhor performance.
    
    Args:
        vector_store: Vector store original
    
    Returns:
        FAISS: Vector store otimizado
    """
    try:
        # Para FAISS, podemos otimizar o índice
        # Isso é especialmente útil para grandes volumes de dados
        
        main_logger.debug("Otimizando vector store FAISS...")
        
        # O FAISS já é otimizado por padrão para volumes pequenos/médios
        # Para volumes maiores, poderíamos usar índices mais sofisticados
        
        return vector_store
        
    except Exception as e:
        main_logger.warning(f"Erro na otimização do vector store: {e}")
        return vector_store  # Retorna o original se a otimização falhar


def get_vector_store_info(vector_store: FAISS) -> Dict[str, Any]:
    """
    Obtém informações sobre o vector store.
    
    Args:
        vector_store: Vector store FAISS
    
    Returns:
        Dict[str, Any]: Informações do vector store
    """
    try:
        # Informações básicas do FAISS
        index = vector_store.index
        
        return {
            "total_vectors": index.ntotal,
            "vector_dimension": index.d,
            "index_type": type(index).__name__,
            "is_trained": index.is_trained if hasattr(index, 'is_trained') else True
        }
        
    except Exception as e:
        main_logger.warning(f"Erro ao obter informações do vector store: {e}")
        return {
            "total_vectors": 0,
            "vector_dimension": 0,
            "index_type": "unknown",
            "is_trained": False
        }