File size: 6,723 Bytes
2fcb36b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import fitz
from pymongo import MongoClient
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_experimental.text_splitter import SemanticChunker
import os
import re
import json
import uuid
from dotenv import load_dotenv
import pinecone

# Load environment variables
load_dotenv()
MONGO_URI = os.getenv("MONGO_URI")
DB_NAME = os.getenv("DB_NAME")
COLLECTION_NAME = os.getenv("COLLECTION_NAME")
FLASH_API = os.getenv("FLASH_API")
PINECONE_API = os.getenv("PINECONE_API")
PINECONE_INDEX = os.getenv("PINECONE_INDEX")

# Initialize services
mongo_client = MongoClient(MONGO_URI)
db = mongo_client[DB_NAME]
collection = db[COLLECTION_NAME]

embed_model = GoogleGenerativeAIEmbeddings(
    model="models/embedding-001",  # Correct model name
    google_api_key=FLASH_API  # Your API key
)
pc = pinecone.Pinecone(
    api_key=PINECONE_API  # Your Pinecone API key
)
index = pc.Index(PINECONE_INDEX)

# Directories for temp files
pdf_temp_dir = 'temp/pdf_files'
os.makedirs(pdf_temp_dir, exist_ok=True)
pdf_path = os.path.join(pdf_temp_dir, 'downloaded_file.pdf')


def download_pdf(url):
    try:
        response = requests.get(url)
        with open(pdf_path, 'wb') as pdf_file:
            pdf_file.write(response.content)
        return pdf_path
    except Exception as e:
        print(f"Error downloading PDF: {e}")
        return None


def semantic_chunking_parallel(pages, url):
    """

    Perform semantic chunking for pages in parallel and store chunks in MongoDB.

    Each chunk is assigned a unique chunk_id.

    """
    semantic_chunker = SemanticChunker(embed_model, breakpoint_threshold_type="percentile")

    def process_page(page):
        try:
            page_content = page["page_content"]
            page_number = page["page_number"]
            page_chunks = semantic_chunker.create_documents([page_content])

            enriched_chunks = []
            for chunk in page_chunks:
                chunk_id = str(uuid.uuid4())  # Generate a unique ID for the chunk
                enriched_chunks.append({
                    "chunk_id": chunk_id,
                    "chunk": chunk.page_content,
                    "page_number": page_number
                })
            return enriched_chunks
        except Exception as e:
            print(f"Error processing page {page['page_number']}: {e}")
            return []

    all_chunks = []
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = {executor.submit(process_page, page): page for page in pages}
        for future in as_completed(futures):
            all_chunks.extend(future.result())

    # Insert chunks into MongoDB
    for chunk in all_chunks:
        collection.update_one(
            {"object_url": url},
            {"$addToSet": {"chunks": chunk}},
            upsert=True
        )

    return all_chunks

def create_embedding_parallel(url, tags, categories):
    """

    Create embeddings for chunks retrieved from MongoDB and insert them into Pinecone.

    """
    document = collection.find_one({"object_url": url})
    mongo_id = str(document.get('_id'))
    if not document or "chunks" not in document:
        print(f"No chunks found for URL: {url}")
        return 0

    chunks = document["chunks"]

    def process_chunk(chunk):
        try:
            chunk_id = chunk["chunk_id"]
            description = chunk["chunk"]
            page_number = chunk["page_number"]

            embedding = embed_model.embed_query(description)
            pinecone_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, description))

            # Insert into Pinecone
            vector = {
                'id': pinecone_id,
                'values': embedding,
                'metadata': {
                    'description': description,
                    "url": url,
                    "page_number": page_number,
                    "tags": ','.join(tags),
                    "categories": ','.join(categories),
                    "filetype":"PDF",
                    "mongo_id": mongo_id,
                }
            }
            index.upsert([vector])
            print(f"Inserted chunk {chunk_id} from page {page_number} into Pinecone.")

            # Update MongoDB with Pinecone ID
            collection.update_one(
                {"object_url": url, "chunks.chunk_id": chunk_id},
                {
                    "$set": {
                        "chunks.$.pinecone_id": pinecone_id,
                        "chunks.$.successfully_embedding_created": True
                    }
                }
            )
            return True
        except Exception as e:
            print(f"Error creating embedding for chunk {chunk['chunk_id']}: {e}")
            return False

    failed_chunks = 0
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = {executor.submit(process_chunk, chunk): chunk for chunk in chunks}
        for future in as_completed(futures):
            if not future.result():
                failed_chunks += 1
    return failed_chunks



def process_pdf(url, tags, categories):
    """

    Process a PDF: download, chunk, and create embeddings.

    """
    print(f"Processing PDF with URL: {url}")
    if download_pdf(url):
        all_pages, total_pages = extract_text_from_pdf(pdf_path)
        print(f"Total pages: {total_pages}")

        print("Performing page-level semantic chunking in parallel...")
        semantic_chunks_with_pages = semantic_chunking_parallel(all_pages, url)
        print(f"Total chunks inserted into MongoDB: {len(semantic_chunks_with_pages)}")

        print("Creating embeddings and inserting into Pinecone in parallel...")
        failed_chunks = create_embedding_parallel(url, tags, categories)
        print(f"Total chunks successfully embedded: {len(semantic_chunks_with_pages) - failed_chunks}")
        print(f"Total chunks failed: {failed_chunks}")

        return failed_chunks < len(semantic_chunks_with_pages)



def extract_text_from_pdf(pdf_path):
    """

    Extract text from each page of a PDF using fitz.

    """
    pdf_document = fitz.open(pdf_path)
    try:
        all_pages = []
        total_pages = len(pdf_document)
        for page_num in range(total_pages):
            page = pdf_document[page_num]
            text = page.get_text()
            all_pages.append({"page_content": text, "page_number": page_num + 1})
    finally:
        pdf_document.close()
    return all_pages, total_pages