MOSPI2 / helper /process_pdf.py
akshansh36's picture
Upload 4 files
2fcb36b verified
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import fitz
from pymongo import MongoClient
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_experimental.text_splitter import SemanticChunker
import os
import re
import json
import uuid
from dotenv import load_dotenv
import pinecone
# Load environment variables
load_dotenv()
MONGO_URI = os.getenv("MONGO_URI")
DB_NAME = os.getenv("DB_NAME")
COLLECTION_NAME = os.getenv("COLLECTION_NAME")
FLASH_API = os.getenv("FLASH_API")
PINECONE_API = os.getenv("PINECONE_API")
PINECONE_INDEX = os.getenv("PINECONE_INDEX")
# Initialize services
mongo_client = MongoClient(MONGO_URI)
db = mongo_client[DB_NAME]
collection = db[COLLECTION_NAME]
embed_model = GoogleGenerativeAIEmbeddings(
model="models/embedding-001", # Correct model name
google_api_key=FLASH_API # Your API key
)
pc = pinecone.Pinecone(
api_key=PINECONE_API # Your Pinecone API key
)
index = pc.Index(PINECONE_INDEX)
# Directories for temp files
pdf_temp_dir = 'temp/pdf_files'
os.makedirs(pdf_temp_dir, exist_ok=True)
pdf_path = os.path.join(pdf_temp_dir, 'downloaded_file.pdf')
def download_pdf(url):
try:
response = requests.get(url)
with open(pdf_path, 'wb') as pdf_file:
pdf_file.write(response.content)
return pdf_path
except Exception as e:
print(f"Error downloading PDF: {e}")
return None
def semantic_chunking_parallel(pages, url):
"""
Perform semantic chunking for pages in parallel and store chunks in MongoDB.
Each chunk is assigned a unique chunk_id.
"""
semantic_chunker = SemanticChunker(embed_model, breakpoint_threshold_type="percentile")
def process_page(page):
try:
page_content = page["page_content"]
page_number = page["page_number"]
page_chunks = semantic_chunker.create_documents([page_content])
enriched_chunks = []
for chunk in page_chunks:
chunk_id = str(uuid.uuid4()) # Generate a unique ID for the chunk
enriched_chunks.append({
"chunk_id": chunk_id,
"chunk": chunk.page_content,
"page_number": page_number
})
return enriched_chunks
except Exception as e:
print(f"Error processing page {page['page_number']}: {e}")
return []
all_chunks = []
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(process_page, page): page for page in pages}
for future in as_completed(futures):
all_chunks.extend(future.result())
# Insert chunks into MongoDB
for chunk in all_chunks:
collection.update_one(
{"object_url": url},
{"$addToSet": {"chunks": chunk}},
upsert=True
)
return all_chunks
def create_embedding_parallel(url, tags, categories):
"""
Create embeddings for chunks retrieved from MongoDB and insert them into Pinecone.
"""
document = collection.find_one({"object_url": url})
mongo_id = str(document.get('_id'))
if not document or "chunks" not in document:
print(f"No chunks found for URL: {url}")
return 0
chunks = document["chunks"]
def process_chunk(chunk):
try:
chunk_id = chunk["chunk_id"]
description = chunk["chunk"]
page_number = chunk["page_number"]
embedding = embed_model.embed_query(description)
pinecone_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, description))
# Insert into Pinecone
vector = {
'id': pinecone_id,
'values': embedding,
'metadata': {
'description': description,
"url": url,
"page_number": page_number,
"tags": ','.join(tags),
"categories": ','.join(categories),
"filetype":"PDF",
"mongo_id": mongo_id,
}
}
index.upsert([vector])
print(f"Inserted chunk {chunk_id} from page {page_number} into Pinecone.")
# Update MongoDB with Pinecone ID
collection.update_one(
{"object_url": url, "chunks.chunk_id": chunk_id},
{
"$set": {
"chunks.$.pinecone_id": pinecone_id,
"chunks.$.successfully_embedding_created": True
}
}
)
return True
except Exception as e:
print(f"Error creating embedding for chunk {chunk['chunk_id']}: {e}")
return False
failed_chunks = 0
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(process_chunk, chunk): chunk for chunk in chunks}
for future in as_completed(futures):
if not future.result():
failed_chunks += 1
return failed_chunks
def process_pdf(url, tags, categories):
"""
Process a PDF: download, chunk, and create embeddings.
"""
print(f"Processing PDF with URL: {url}")
if download_pdf(url):
all_pages, total_pages = extract_text_from_pdf(pdf_path)
print(f"Total pages: {total_pages}")
print("Performing page-level semantic chunking in parallel...")
semantic_chunks_with_pages = semantic_chunking_parallel(all_pages, url)
print(f"Total chunks inserted into MongoDB: {len(semantic_chunks_with_pages)}")
print("Creating embeddings and inserting into Pinecone in parallel...")
failed_chunks = create_embedding_parallel(url, tags, categories)
print(f"Total chunks successfully embedded: {len(semantic_chunks_with_pages) - failed_chunks}")
print(f"Total chunks failed: {failed_chunks}")
return failed_chunks < len(semantic_chunks_with_pages)
def extract_text_from_pdf(pdf_path):
"""
Extract text from each page of a PDF using fitz.
"""
pdf_document = fitz.open(pdf_path)
try:
all_pages = []
total_pages = len(pdf_document)
for page_num in range(total_pages):
page = pdf_document[page_num]
text = page.get_text()
all_pages.append({"page_content": text, "page_number": page_num + 1})
finally:
pdf_document.close()
return all_pages, total_pages