from concurrent.futures import ThreadPoolExecutor, as_completed import requests import fitz from pymongo import MongoClient from langchain_google_genai import GoogleGenerativeAIEmbeddings from langchain_experimental.text_splitter import SemanticChunker import os import re import json import uuid from dotenv import load_dotenv import pinecone # Load environment variables load_dotenv() MONGO_URI = os.getenv("MONGO_URI") DB_NAME = os.getenv("DB_NAME") COLLECTION_NAME = os.getenv("COLLECTION_NAME") FLASH_API = os.getenv("FLASH_API") PINECONE_API = os.getenv("PINECONE_API") PINECONE_INDEX = os.getenv("PINECONE_INDEX") # Initialize services mongo_client = MongoClient(MONGO_URI) db = mongo_client[DB_NAME] collection = db[COLLECTION_NAME] embed_model = GoogleGenerativeAIEmbeddings( model="models/embedding-001", # Correct model name google_api_key=FLASH_API # Your API key ) pc = pinecone.Pinecone( api_key=PINECONE_API # Your Pinecone API key ) index = pc.Index(PINECONE_INDEX) # Directories for temp files pdf_temp_dir = 'temp/pdf_files' os.makedirs(pdf_temp_dir, exist_ok=True) pdf_path = os.path.join(pdf_temp_dir, 'downloaded_file.pdf') def download_pdf(url): try: response = requests.get(url) with open(pdf_path, 'wb') as pdf_file: pdf_file.write(response.content) return pdf_path except Exception as e: print(f"Error downloading PDF: {e}") return None def semantic_chunking_parallel(pages, url): """ Perform semantic chunking for pages in parallel and store chunks in MongoDB. Each chunk is assigned a unique chunk_id. """ semantic_chunker = SemanticChunker(embed_model, breakpoint_threshold_type="percentile") def process_page(page): try: page_content = page["page_content"] page_number = page["page_number"] page_chunks = semantic_chunker.create_documents([page_content]) enriched_chunks = [] for chunk in page_chunks: chunk_id = str(uuid.uuid4()) # Generate a unique ID for the chunk enriched_chunks.append({ "chunk_id": chunk_id, "chunk": chunk.page_content, "page_number": page_number }) return enriched_chunks except Exception as e: print(f"Error processing page {page['page_number']}: {e}") return [] all_chunks = [] with ThreadPoolExecutor(max_workers=8) as executor: futures = {executor.submit(process_page, page): page for page in pages} for future in as_completed(futures): all_chunks.extend(future.result()) # Insert chunks into MongoDB for chunk in all_chunks: collection.update_one( {"object_url": url}, {"$addToSet": {"chunks": chunk}}, upsert=True ) return all_chunks def create_embedding_parallel(url, tags, categories): """ Create embeddings for chunks retrieved from MongoDB and insert them into Pinecone. """ document = collection.find_one({"object_url": url}) mongo_id = str(document.get('_id')) if not document or "chunks" not in document: print(f"No chunks found for URL: {url}") return 0 chunks = document["chunks"] def process_chunk(chunk): try: chunk_id = chunk["chunk_id"] description = chunk["chunk"] page_number = chunk["page_number"] embedding = embed_model.embed_query(description) pinecone_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, description)) # Insert into Pinecone vector = { 'id': pinecone_id, 'values': embedding, 'metadata': { 'description': description, "url": url, "page_number": page_number, "tags": ','.join(tags), "categories": ','.join(categories), "filetype":"PDF", "mongo_id": mongo_id, } } index.upsert([vector]) print(f"Inserted chunk {chunk_id} from page {page_number} into Pinecone.") # Update MongoDB with Pinecone ID collection.update_one( {"object_url": url, "chunks.chunk_id": chunk_id}, { "$set": { "chunks.$.pinecone_id": pinecone_id, "chunks.$.successfully_embedding_created": True } } ) return True except Exception as e: print(f"Error creating embedding for chunk {chunk['chunk_id']}: {e}") return False failed_chunks = 0 with ThreadPoolExecutor(max_workers=8) as executor: futures = {executor.submit(process_chunk, chunk): chunk for chunk in chunks} for future in as_completed(futures): if not future.result(): failed_chunks += 1 return failed_chunks def process_pdf(url, tags, categories): """ Process a PDF: download, chunk, and create embeddings. """ print(f"Processing PDF with URL: {url}") if download_pdf(url): all_pages, total_pages = extract_text_from_pdf(pdf_path) print(f"Total pages: {total_pages}") print("Performing page-level semantic chunking in parallel...") semantic_chunks_with_pages = semantic_chunking_parallel(all_pages, url) print(f"Total chunks inserted into MongoDB: {len(semantic_chunks_with_pages)}") print("Creating embeddings and inserting into Pinecone in parallel...") failed_chunks = create_embedding_parallel(url, tags, categories) print(f"Total chunks successfully embedded: {len(semantic_chunks_with_pages) - failed_chunks}") print(f"Total chunks failed: {failed_chunks}") return failed_chunks < len(semantic_chunks_with_pages) def extract_text_from_pdf(pdf_path): """ Extract text from each page of a PDF using fitz. """ pdf_document = fitz.open(pdf_path) try: all_pages = [] total_pages = len(pdf_document) for page_num in range(total_pages): page = pdf_document[page_num] text = page.get_text() all_pages.append({"page_content": text, "page_number": page_num + 1}) finally: pdf_document.close() return all_pages, total_pages