Spaces:
Sleeping
Sleeping
File size: 6,723 Bytes
2fcb36b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import fitz
from pymongo import MongoClient
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_experimental.text_splitter import SemanticChunker
import os
import re
import json
import uuid
from dotenv import load_dotenv
import pinecone
# Load environment variables
load_dotenv()
MONGO_URI = os.getenv("MONGO_URI")
DB_NAME = os.getenv("DB_NAME")
COLLECTION_NAME = os.getenv("COLLECTION_NAME")
FLASH_API = os.getenv("FLASH_API")
PINECONE_API = os.getenv("PINECONE_API")
PINECONE_INDEX = os.getenv("PINECONE_INDEX")
# Initialize services
mongo_client = MongoClient(MONGO_URI)
db = mongo_client[DB_NAME]
collection = db[COLLECTION_NAME]
embed_model = GoogleGenerativeAIEmbeddings(
model="models/embedding-001", # Correct model name
google_api_key=FLASH_API # Your API key
)
pc = pinecone.Pinecone(
api_key=PINECONE_API # Your Pinecone API key
)
index = pc.Index(PINECONE_INDEX)
# Directories for temp files
pdf_temp_dir = 'temp/pdf_files'
os.makedirs(pdf_temp_dir, exist_ok=True)
pdf_path = os.path.join(pdf_temp_dir, 'downloaded_file.pdf')
def download_pdf(url):
try:
response = requests.get(url)
with open(pdf_path, 'wb') as pdf_file:
pdf_file.write(response.content)
return pdf_path
except Exception as e:
print(f"Error downloading PDF: {e}")
return None
def semantic_chunking_parallel(pages, url):
"""
Perform semantic chunking for pages in parallel and store chunks in MongoDB.
Each chunk is assigned a unique chunk_id.
"""
semantic_chunker = SemanticChunker(embed_model, breakpoint_threshold_type="percentile")
def process_page(page):
try:
page_content = page["page_content"]
page_number = page["page_number"]
page_chunks = semantic_chunker.create_documents([page_content])
enriched_chunks = []
for chunk in page_chunks:
chunk_id = str(uuid.uuid4()) # Generate a unique ID for the chunk
enriched_chunks.append({
"chunk_id": chunk_id,
"chunk": chunk.page_content,
"page_number": page_number
})
return enriched_chunks
except Exception as e:
print(f"Error processing page {page['page_number']}: {e}")
return []
all_chunks = []
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(process_page, page): page for page in pages}
for future in as_completed(futures):
all_chunks.extend(future.result())
# Insert chunks into MongoDB
for chunk in all_chunks:
collection.update_one(
{"object_url": url},
{"$addToSet": {"chunks": chunk}},
upsert=True
)
return all_chunks
def create_embedding_parallel(url, tags, categories):
"""
Create embeddings for chunks retrieved from MongoDB and insert them into Pinecone.
"""
document = collection.find_one({"object_url": url})
mongo_id = str(document.get('_id'))
if not document or "chunks" not in document:
print(f"No chunks found for URL: {url}")
return 0
chunks = document["chunks"]
def process_chunk(chunk):
try:
chunk_id = chunk["chunk_id"]
description = chunk["chunk"]
page_number = chunk["page_number"]
embedding = embed_model.embed_query(description)
pinecone_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, description))
# Insert into Pinecone
vector = {
'id': pinecone_id,
'values': embedding,
'metadata': {
'description': description,
"url": url,
"page_number": page_number,
"tags": ','.join(tags),
"categories": ','.join(categories),
"filetype":"PDF",
"mongo_id": mongo_id,
}
}
index.upsert([vector])
print(f"Inserted chunk {chunk_id} from page {page_number} into Pinecone.")
# Update MongoDB with Pinecone ID
collection.update_one(
{"object_url": url, "chunks.chunk_id": chunk_id},
{
"$set": {
"chunks.$.pinecone_id": pinecone_id,
"chunks.$.successfully_embedding_created": True
}
}
)
return True
except Exception as e:
print(f"Error creating embedding for chunk {chunk['chunk_id']}: {e}")
return False
failed_chunks = 0
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(process_chunk, chunk): chunk for chunk in chunks}
for future in as_completed(futures):
if not future.result():
failed_chunks += 1
return failed_chunks
def process_pdf(url, tags, categories):
"""
Process a PDF: download, chunk, and create embeddings.
"""
print(f"Processing PDF with URL: {url}")
if download_pdf(url):
all_pages, total_pages = extract_text_from_pdf(pdf_path)
print(f"Total pages: {total_pages}")
print("Performing page-level semantic chunking in parallel...")
semantic_chunks_with_pages = semantic_chunking_parallel(all_pages, url)
print(f"Total chunks inserted into MongoDB: {len(semantic_chunks_with_pages)}")
print("Creating embeddings and inserting into Pinecone in parallel...")
failed_chunks = create_embedding_parallel(url, tags, categories)
print(f"Total chunks successfully embedded: {len(semantic_chunks_with_pages) - failed_chunks}")
print(f"Total chunks failed: {failed_chunks}")
return failed_chunks < len(semantic_chunks_with_pages)
def extract_text_from_pdf(pdf_path):
"""
Extract text from each page of a PDF using fitz.
"""
pdf_document = fitz.open(pdf_path)
try:
all_pages = []
total_pages = len(pdf_document)
for page_num in range(total_pages):
page = pdf_document[page_num]
text = page.get_text()
all_pages.append({"page_content": text, "page_number": page_num + 1})
finally:
pdf_document.close()
return all_pages, total_pages
|