Spaces:
Runtime error
Runtime error
File size: 12,517 Bytes
7be47d4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
import os
import time
import uuid
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader
from langchain_google_genai import GoogleGenerativeAIEmbeddings
import logging
from pinecone import Pinecone
from app.database.pinecone import get_pinecone_index, init_pinecone
from app.database.postgresql import get_db
from app.database.models import VectorDatabase
# Configure logging
logger = logging.getLogger(__name__)
# Initialize embeddings model
embeddings_model = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
class PDFProcessor:
"""Class for processing PDF files and creating embeddings"""
def __init__(self, index_name="testbot768", namespace="Default", api_key=None, vector_db_id=None, mock_mode=False):
"""Initialize with Pinecone index name, namespace and API key"""
self.index_name = index_name
self.namespace = namespace
self.pinecone_index = None
self.api_key = api_key
self.vector_db_id = vector_db_id
self.pinecone_client = None
self.mock_mode = mock_mode # Add mock mode for testing
def _get_api_key_from_db(self):
"""Get API key from database if not provided directly"""
if self.api_key:
return self.api_key
if not self.vector_db_id:
logger.error("No API key provided and no vector_db_id to fetch from database")
return None
try:
# Get database session
db = next(get_db())
# Get vector database
vector_db = db.query(VectorDatabase).filter(
VectorDatabase.id == self.vector_db_id
).first()
if not vector_db:
logger.error(f"Vector database with ID {self.vector_db_id} not found")
return None
# Get API key from relationship
if hasattr(vector_db, 'api_key_ref') and vector_db.api_key_ref and hasattr(vector_db.api_key_ref, 'key_value'):
logger.info(f"Using API key from api_key table for vector database ID {self.vector_db_id}")
return vector_db.api_key_ref.key_value
logger.error(f"No API key found for vector database ID {self.vector_db_id}. Make sure the api_key_id is properly set.")
return None
except Exception as e:
logger.error(f"Error fetching API key from database: {e}")
return None
def _init_pinecone_connection(self):
"""Initialize connection to Pinecone with new API"""
try:
# If in mock mode, return a mock index
if self.mock_mode:
logger.info("Running in mock mode - simulating Pinecone connection")
class MockPineconeIndex:
def upsert(self, vectors, namespace=None):
logger.info(f"Mock upsert: {len(vectors)} vectors to namespace '{namespace}'")
return {"upserted_count": len(vectors)}
def delete(self, ids=None, delete_all=False, namespace=None):
logger.info(f"Mock delete: {'all vectors' if delete_all else f'{len(ids)} vectors'} from namespace '{namespace}'")
return {"deleted_count": 10 if delete_all else len(ids or [])}
def describe_index_stats(self):
logger.info(f"Mock describe_index_stats")
return {"total_vector_count": 100, "namespaces": {self.namespace: {"vector_count": 50}}}
return MockPineconeIndex()
# Get API key from database if not provided
api_key = self._get_api_key_from_db()
if not api_key or not self.index_name:
logger.error("Pinecone API key or index name not available")
return None
# Initialize Pinecone client using the new API
self.pinecone_client = Pinecone(api_key=api_key)
# Get the index
index_list = self.pinecone_client.list_indexes()
existing_indexes = index_list.names() if hasattr(index_list, 'names') else []
if self.index_name not in existing_indexes:
logger.error(f"Index {self.index_name} does not exist in Pinecone")
return None
# Connect to the index
index = self.pinecone_client.Index(self.index_name)
logger.info(f"Connected to Pinecone index: {self.index_name}")
return index
except Exception as e:
logger.error(f"Error connecting to Pinecone: {e}")
return None
async def process_pdf(self, file_path, document_id=None, metadata=None, progress_callback=None):
"""
Process PDF file, split into chunks and create embeddings
Args:
file_path (str): Path to the PDF file
document_id (str, optional): Document ID, if not provided a new ID will be created
metadata (dict, optional): Additional metadata for the document
progress_callback (callable, optional): Callback function for progress updates
Returns:
dict: Processing result information including document_id and processed chunks count
"""
try:
# Initialize Pinecone connection if not already done
self.pinecone_index = self._init_pinecone_connection()
if not self.pinecone_index:
return {"success": False, "error": "Could not connect to Pinecone"}
# Create document_id if not provided
if not document_id:
document_id = str(uuid.uuid4())
# Load PDF using PyPDFLoader
logger.info(f"Reading PDF file: {file_path}")
if progress_callback:
await progress_callback("pdf_loading", 0.5, "Loading PDF file")
loader = PyPDFLoader(file_path)
pages = loader.load()
# Extract and concatenate text from all pages
all_text = ""
for page in pages:
all_text += page.page_content + "\n"
if progress_callback:
await progress_callback("text_extraction", 0.6, "Extracted text from PDF")
# Split text into chunks
text_splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=300)
chunks = text_splitter.split_text(all_text)
logger.info(f"Split PDF file into {len(chunks)} chunks")
if progress_callback:
await progress_callback("chunking", 0.7, f"Split document into {len(chunks)} chunks")
# Process embeddings for each chunk and upsert to Pinecone
vectors = []
for i, chunk in enumerate(chunks):
# Update embedding progress
if progress_callback and i % 5 == 0: # Update every 5 chunks to avoid too many notifications
embedding_progress = 0.7 + (0.3 * (i / len(chunks)))
await progress_callback("embedding", embedding_progress, f"Processing chunk {i+1}/{len(chunks)}")
# Create vector embedding for each chunk
vector = embeddings_model.embed_query(chunk)
# Prepare metadata for vector
vector_metadata = {
"document_id": document_id,
"chunk_index": i,
"text": chunk
}
# Add additional metadata if provided
if metadata:
for key, value in metadata.items():
if key not in vector_metadata:
vector_metadata[key] = value
# Add vector to list for upserting
vectors.append({
"id": f"{document_id}_{i}",
"values": vector,
"metadata": vector_metadata
})
# Upsert in batches of 100 to avoid overloading
if len(vectors) >= 100:
await self._upsert_vectors(vectors)
vectors = []
# Upsert any remaining vectors
if vectors:
await self._upsert_vectors(vectors)
logger.info(f"Embedded and saved {len(chunks)} chunks from PDF with document_id: {document_id}")
# Final progress update
if progress_callback:
await progress_callback("completed", 1.0, "PDF processing complete")
return {
"success": True,
"document_id": document_id,
"chunks_processed": len(chunks),
"total_text_length": len(all_text)
}
except Exception as e:
logger.error(f"Error processing PDF: {str(e)}")
if progress_callback:
await progress_callback("error", 0, f"Error processing PDF: {str(e)}")
return {
"success": False,
"error": str(e)
}
async def _upsert_vectors(self, vectors):
"""Upsert vectors to Pinecone"""
try:
if not vectors:
return
# Ensure we have a valid pinecone_index
if not self.pinecone_index:
self.pinecone_index = self._init_pinecone_connection()
if not self.pinecone_index:
raise Exception("Cannot connect to Pinecone")
result = self.pinecone_index.upsert(
vectors=vectors,
namespace=self.namespace
)
logger.info(f"Upserted {len(vectors)} vectors to Pinecone")
return result
except Exception as e:
logger.error(f"Error upserting vectors: {str(e)}")
raise
async def delete_namespace(self):
"""
Delete all vectors in the current namespace (equivalent to deleting the namespace).
"""
# Initialize connection if needed
self.pinecone_index = self._init_pinecone_connection()
if not self.pinecone_index:
return {"success": False, "error": "Could not connect to Pinecone"}
try:
# delete_all=True will delete all vectors in the namespace
result = self.pinecone_index.delete(
delete_all=True,
namespace=self.namespace
)
logger.info(f"Deleted namespace '{self.namespace}' (all vectors).")
return {"success": True, "detail": result}
except Exception as e:
logger.error(f"Error deleting namespace '{self.namespace}': {e}")
return {"success": False, "error": str(e)}
async def list_documents(self):
"""Get list of all document_ids from Pinecone"""
try:
# Initialize Pinecone connection if not already done
self.pinecone_index = self._init_pinecone_connection()
if not self.pinecone_index:
return {"success": False, "error": "Could not connect to Pinecone"}
# Get index information
stats = self.pinecone_index.describe_index_stats()
# Query to get list of all unique document_ids
# This method may not be efficient with large datasets, but is the simplest approach
# In practice, you should maintain a list of document_ids in a separate database
return {
"success": True,
"total_vectors": stats.get('total_vector_count', 0),
"namespace": self.namespace,
"index_name": self.index_name
}
except Exception as e:
logger.error(f"Error getting document list: {str(e)}")
return {
"success": False,
"error": str(e)
} |