File size: 12,517 Bytes
7be47d4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import os
import time
import uuid
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
from langchain_google_genai import GoogleGenerativeAIEmbeddings
import logging
from pinecone import Pinecone

from app.database.pinecone import get_pinecone_index, init_pinecone
from app.database.postgresql import get_db
from app.database.models import VectorDatabase

# Configure logging
logger = logging.getLogger(__name__)

# Initialize embeddings model
embeddings_model = GoogleGenerativeAIEmbeddings(model="models/embedding-001")

class PDFProcessor:
    """Class for processing PDF files and creating embeddings"""
    
    def __init__(self, index_name="testbot768", namespace="Default", api_key=None, vector_db_id=None, mock_mode=False):
        """Initialize with Pinecone index name, namespace and API key"""
        self.index_name = index_name
        self.namespace = namespace
        self.pinecone_index = None
        self.api_key = api_key
        self.vector_db_id = vector_db_id
        self.pinecone_client = None
        self.mock_mode = mock_mode  # Add mock mode for testing
        
    def _get_api_key_from_db(self):
        """Get API key from database if not provided directly"""
        if self.api_key:
            return self.api_key
            
        if not self.vector_db_id:
            logger.error("No API key provided and no vector_db_id to fetch from database")
            return None
            
        try:
            # Get database session
            db = next(get_db())
            
            # Get vector database
            vector_db = db.query(VectorDatabase).filter(
                VectorDatabase.id == self.vector_db_id
            ).first()
            
            if not vector_db:
                logger.error(f"Vector database with ID {self.vector_db_id} not found")
                return None
                
            # Get API key from relationship
            if hasattr(vector_db, 'api_key_ref') and vector_db.api_key_ref and hasattr(vector_db.api_key_ref, 'key_value'):
                logger.info(f"Using API key from api_key table for vector database ID {self.vector_db_id}")
                return vector_db.api_key_ref.key_value
                
            logger.error(f"No API key found for vector database ID {self.vector_db_id}. Make sure the api_key_id is properly set.")
            return None
        except Exception as e:
            logger.error(f"Error fetching API key from database: {e}")
            return None
    
    def _init_pinecone_connection(self):
        """Initialize connection to Pinecone with new API"""
        try:
            # If in mock mode, return a mock index
            if self.mock_mode:
                logger.info("Running in mock mode - simulating Pinecone connection")
                class MockPineconeIndex:
                    def upsert(self, vectors, namespace=None):
                        logger.info(f"Mock upsert: {len(vectors)} vectors to namespace '{namespace}'")
                        return {"upserted_count": len(vectors)}
                        
                    def delete(self, ids=None, delete_all=False, namespace=None):
                        logger.info(f"Mock delete: {'all vectors' if delete_all else f'{len(ids)} vectors'} from namespace '{namespace}'")
                        return {"deleted_count": 10 if delete_all else len(ids or [])}
                        
                    def describe_index_stats(self):
                        logger.info(f"Mock describe_index_stats")
                        return {"total_vector_count": 100, "namespaces": {self.namespace: {"vector_count": 50}}}
                
                return MockPineconeIndex()
            
            # Get API key from database if not provided
            api_key = self._get_api_key_from_db()
            
            if not api_key or not self.index_name:
                logger.error("Pinecone API key or index name not available")
                return None
            
            # Initialize Pinecone client using the new API
            self.pinecone_client = Pinecone(api_key=api_key)
            
            # Get the index
            index_list = self.pinecone_client.list_indexes()
            existing_indexes = index_list.names() if hasattr(index_list, 'names') else []
            
            if self.index_name not in existing_indexes:
                logger.error(f"Index {self.index_name} does not exist in Pinecone")
                return None
            
            # Connect to the index
            index = self.pinecone_client.Index(self.index_name)
            logger.info(f"Connected to Pinecone index: {self.index_name}")
            return index
        except Exception as e:
            logger.error(f"Error connecting to Pinecone: {e}")
            return None
            
    async def process_pdf(self, file_path, document_id=None, metadata=None, progress_callback=None):
        """
        Process PDF file, split into chunks and create embeddings
        
        Args:
            file_path (str): Path to the PDF file
            document_id (str, optional): Document ID, if not provided a new ID will be created
            metadata (dict, optional): Additional metadata for the document
            progress_callback (callable, optional): Callback function for progress updates
            
        Returns:
            dict: Processing result information including document_id and processed chunks count
        """
        try:
            # Initialize Pinecone connection if not already done
            self.pinecone_index = self._init_pinecone_connection()
            if not self.pinecone_index:
                return {"success": False, "error": "Could not connect to Pinecone"}
            
            # Create document_id if not provided
            if not document_id:
                document_id = str(uuid.uuid4())
            
            # Load PDF using PyPDFLoader
            logger.info(f"Reading PDF file: {file_path}")
            if progress_callback:
                await progress_callback("pdf_loading", 0.5, "Loading PDF file")
                
            loader = PyPDFLoader(file_path)
            pages = loader.load()
            
            # Extract and concatenate text from all pages
            all_text = ""
            for page in pages:
                all_text += page.page_content + "\n"
            
            if progress_callback:
                await progress_callback("text_extraction", 0.6, "Extracted text from PDF")
                
            # Split text into chunks
            text_splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=300)
            chunks = text_splitter.split_text(all_text)
            
            logger.info(f"Split PDF file into {len(chunks)} chunks")
            if progress_callback:
                await progress_callback("chunking", 0.7, f"Split document into {len(chunks)} chunks")
            
            # Process embeddings for each chunk and upsert to Pinecone
            vectors = []
            for i, chunk in enumerate(chunks):
                # Update embedding progress
                if progress_callback and i % 5 == 0:  # Update every 5 chunks to avoid too many notifications
                    embedding_progress = 0.7 + (0.3 * (i / len(chunks)))
                    await progress_callback("embedding", embedding_progress, f"Processing chunk {i+1}/{len(chunks)}")
                
                # Create vector embedding for each chunk
                vector = embeddings_model.embed_query(chunk)
                
                # Prepare metadata for vector
                vector_metadata = {
                    "document_id": document_id,
                    "chunk_index": i,
                    "text": chunk
                }
                
                # Add additional metadata if provided
                if metadata:
                    for key, value in metadata.items():
                        if key not in vector_metadata:
                            vector_metadata[key] = value
                
                # Add vector to list for upserting
                vectors.append({
                    "id": f"{document_id}_{i}",
                    "values": vector,
                    "metadata": vector_metadata
                })
                
                # Upsert in batches of 100 to avoid overloading
                if len(vectors) >= 100:
                    await self._upsert_vectors(vectors)
                    vectors = []
            
            # Upsert any remaining vectors
            if vectors:
                await self._upsert_vectors(vectors)
            
            logger.info(f"Embedded and saved {len(chunks)} chunks from PDF with document_id: {document_id}")
            
            # Final progress update
            if progress_callback:
                await progress_callback("completed", 1.0, "PDF processing complete")
            
            return {
                "success": True,
                "document_id": document_id,
                "chunks_processed": len(chunks),
                "total_text_length": len(all_text)
            }
            
        except Exception as e:
            logger.error(f"Error processing PDF: {str(e)}")
            if progress_callback:
                await progress_callback("error", 0, f"Error processing PDF: {str(e)}")
            return {
                "success": False,
                "error": str(e)
            }
    
    async def _upsert_vectors(self, vectors):
        """Upsert vectors to Pinecone"""
        try:
            if not vectors:
                return
                
            # Ensure we have a valid pinecone_index
            if not self.pinecone_index:
                self.pinecone_index = self._init_pinecone_connection()
                if not self.pinecone_index:
                    raise Exception("Cannot connect to Pinecone")
            
            result = self.pinecone_index.upsert(
                vectors=vectors,
                namespace=self.namespace
            )
            
            logger.info(f"Upserted {len(vectors)} vectors to Pinecone")
            return result
        except Exception as e:
            logger.error(f"Error upserting vectors: {str(e)}")
            raise
    
    async def delete_namespace(self):
        """
        Delete all vectors in the current namespace (equivalent to deleting the namespace).
        """
        # Initialize connection if needed
        self.pinecone_index = self._init_pinecone_connection()
        if not self.pinecone_index:
            return {"success": False, "error": "Could not connect to Pinecone"}

        try:
            # delete_all=True will delete all vectors in the namespace
            result = self.pinecone_index.delete(
                delete_all=True,
                namespace=self.namespace
            )
            logger.info(f"Deleted namespace '{self.namespace}' (all vectors).")
            return {"success": True, "detail": result}
        except Exception as e:
            logger.error(f"Error deleting namespace '{self.namespace}': {e}")
            return {"success": False, "error": str(e)}
    
    async def list_documents(self):
        """Get list of all document_ids from Pinecone"""
        try:
            # Initialize Pinecone connection if not already done
            self.pinecone_index = self._init_pinecone_connection()
            if not self.pinecone_index:
                return {"success": False, "error": "Could not connect to Pinecone"}
            
            # Get index information
            stats = self.pinecone_index.describe_index_stats()
            
            # Query to get list of all unique document_ids
            # This method may not be efficient with large datasets, but is the simplest approach
            # In practice, you should maintain a list of document_ids in a separate database
            
            return {
                "success": True,
                "total_vectors": stats.get('total_vector_count', 0),
                "namespace": self.namespace,
                "index_name": self.index_name
            }
        except Exception as e:
            logger.error(f"Error getting document list: {str(e)}")
            return {
                "success": False,
                "error": str(e)
            }