Spaces:
Sleeping
Sleeping
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import requests | |
| import fitz | |
| from pymongo import MongoClient | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
| from langchain_experimental.text_splitter import SemanticChunker | |
| import os | |
| import re | |
| import json | |
| import uuid | |
| from dotenv import load_dotenv | |
| import pinecone | |
| # Load environment variables | |
| load_dotenv() | |
| MONGO_URI = os.getenv("MONGO_URI") | |
| DB_NAME = os.getenv("DB_NAME") | |
| COLLECTION_NAME = os.getenv("COLLECTION_NAME") | |
| FLASH_API = os.getenv("FLASH_API") | |
| PINECONE_API = os.getenv("PINECONE_API") | |
| PINECONE_INDEX = os.getenv("PINECONE_INDEX") | |
| # Initialize services | |
| mongo_client = MongoClient(MONGO_URI) | |
| db = mongo_client[DB_NAME] | |
| collection = db[COLLECTION_NAME] | |
| embed_model = GoogleGenerativeAIEmbeddings( | |
| model="models/embedding-001", # Correct model name | |
| google_api_key=FLASH_API # Your API key | |
| ) | |
| pc = pinecone.Pinecone( | |
| api_key=PINECONE_API # Your Pinecone API key | |
| ) | |
| index = pc.Index(PINECONE_INDEX) | |
| # Directories for temp files | |
| pdf_temp_dir = 'temp/pdf_files' | |
| os.makedirs(pdf_temp_dir, exist_ok=True) | |
| pdf_path = os.path.join(pdf_temp_dir, 'downloaded_file.pdf') | |
| def download_pdf(url): | |
| try: | |
| response = requests.get(url) | |
| with open(pdf_path, 'wb') as pdf_file: | |
| pdf_file.write(response.content) | |
| return pdf_path | |
| except Exception as e: | |
| print(f"Error downloading PDF: {e}") | |
| return None | |
| def semantic_chunking_parallel(pages, url): | |
| """ | |
| Perform semantic chunking for pages in parallel and store chunks in MongoDB. | |
| Each chunk is assigned a unique chunk_id. | |
| """ | |
| semantic_chunker = SemanticChunker(embed_model, breakpoint_threshold_type="percentile") | |
| def process_page(page): | |
| try: | |
| page_content = page["page_content"] | |
| page_number = page["page_number"] | |
| page_chunks = semantic_chunker.create_documents([page_content]) | |
| enriched_chunks = [] | |
| for chunk in page_chunks: | |
| chunk_id = str(uuid.uuid4()) # Generate a unique ID for the chunk | |
| enriched_chunks.append({ | |
| "chunk_id": chunk_id, | |
| "chunk": chunk.page_content, | |
| "page_number": page_number | |
| }) | |
| return enriched_chunks | |
| except Exception as e: | |
| print(f"Error processing page {page['page_number']}: {e}") | |
| return [] | |
| all_chunks = [] | |
| with ThreadPoolExecutor(max_workers=8) as executor: | |
| futures = {executor.submit(process_page, page): page for page in pages} | |
| for future in as_completed(futures): | |
| all_chunks.extend(future.result()) | |
| # Insert chunks into MongoDB | |
| for chunk in all_chunks: | |
| collection.update_one( | |
| {"object_url": url}, | |
| {"$addToSet": {"chunks": chunk}}, | |
| upsert=True | |
| ) | |
| return all_chunks | |
| def create_embedding_parallel(url, tags, categories): | |
| """ | |
| Create embeddings for chunks retrieved from MongoDB and insert them into Pinecone. | |
| """ | |
| document = collection.find_one({"object_url": url}) | |
| mongo_id = str(document.get('_id')) | |
| if not document or "chunks" not in document: | |
| print(f"No chunks found for URL: {url}") | |
| return 0 | |
| chunks = document["chunks"] | |
| def process_chunk(chunk): | |
| try: | |
| chunk_id = chunk["chunk_id"] | |
| description = chunk["chunk"] | |
| page_number = chunk["page_number"] | |
| embedding = embed_model.embed_query(description) | |
| pinecone_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, description)) | |
| # Insert into Pinecone | |
| vector = { | |
| 'id': pinecone_id, | |
| 'values': embedding, | |
| 'metadata': { | |
| 'description': description, | |
| "url": url, | |
| "page_number": page_number, | |
| "tags": ','.join(tags), | |
| "categories": ','.join(categories), | |
| "filetype":"PDF", | |
| "mongo_id": mongo_id, | |
| } | |
| } | |
| index.upsert([vector]) | |
| print(f"Inserted chunk {chunk_id} from page {page_number} into Pinecone.") | |
| # Update MongoDB with Pinecone ID | |
| collection.update_one( | |
| {"object_url": url, "chunks.chunk_id": chunk_id}, | |
| { | |
| "$set": { | |
| "chunks.$.pinecone_id": pinecone_id, | |
| "chunks.$.successfully_embedding_created": True | |
| } | |
| } | |
| ) | |
| return True | |
| except Exception as e: | |
| print(f"Error creating embedding for chunk {chunk['chunk_id']}: {e}") | |
| return False | |
| failed_chunks = 0 | |
| with ThreadPoolExecutor(max_workers=8) as executor: | |
| futures = {executor.submit(process_chunk, chunk): chunk for chunk in chunks} | |
| for future in as_completed(futures): | |
| if not future.result(): | |
| failed_chunks += 1 | |
| return failed_chunks | |
| def process_pdf(url, tags, categories): | |
| """ | |
| Process a PDF: download, chunk, and create embeddings. | |
| """ | |
| print(f"Processing PDF with URL: {url}") | |
| if download_pdf(url): | |
| all_pages, total_pages = extract_text_from_pdf(pdf_path) | |
| print(f"Total pages: {total_pages}") | |
| print("Performing page-level semantic chunking in parallel...") | |
| semantic_chunks_with_pages = semantic_chunking_parallel(all_pages, url) | |
| print(f"Total chunks inserted into MongoDB: {len(semantic_chunks_with_pages)}") | |
| print("Creating embeddings and inserting into Pinecone in parallel...") | |
| failed_chunks = create_embedding_parallel(url, tags, categories) | |
| print(f"Total chunks successfully embedded: {len(semantic_chunks_with_pages) - failed_chunks}") | |
| print(f"Total chunks failed: {failed_chunks}") | |
| return failed_chunks < len(semantic_chunks_with_pages) | |
| def extract_text_from_pdf(pdf_path): | |
| """ | |
| Extract text from each page of a PDF using fitz. | |
| """ | |
| pdf_document = fitz.open(pdf_path) | |
| try: | |
| all_pages = [] | |
| total_pages = len(pdf_document) | |
| for page_num in range(total_pages): | |
| page = pdf_document[page_num] | |
| text = page.get_text() | |
| all_pages.append({"page_content": text, "page_number": page_num + 1}) | |
| finally: | |
| pdf_document.close() | |
| return all_pages, total_pages | |