#!/usr/bin/env python3 """ Pre-processes PDF files in the data directory, creating and saving: 1. Document chunks with metadata 2. Vector embeddings This allows the app to load pre-processed data directly instead of processing PDFs at runtime, making the app start faster and eliminating the need to upload PDFs to Hugging Face. """ import os import pickle import tiktoken from pathlib import Path from collections import defaultdict import json # Import required LangChain modules from langchain_community.document_loaders import DirectoryLoader from langchain_community.document_loaders import PyPDFLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_openai.embeddings import OpenAIEmbeddings from langchain_qdrant import Qdrant from langchain_core.documents import Document from qdrant_client import QdrantClient from qdrant_client.models import Distance, VectorParams, PointStruct # Set OpenAI API key from dotenv import load_dotenv load_dotenv() # Ensure OpenAI API key is available if not os.environ.get("OPENAI_API_KEY"): raise EnvironmentError("OPENAI_API_KEY environment variable not found. Please set it in your .env file.") # Create directories to store pre-processed data PROCESSED_DATA_DIR = Path("processed_data") PROCESSED_DATA_DIR.mkdir(exist_ok=True) CHUNKS_FILE = PROCESSED_DATA_DIR / "document_chunks.pkl" QDRANT_DIR = PROCESSED_DATA_DIR / "qdrant_vectorstore" def load_and_process_documents(): """ Load PDF documents, merge them by source, and split into chunks. This is the same process used in the notebook and app.py. """ print("Loading PDF documents...") path = "data/" loader = DirectoryLoader(path, glob="*.pdf", loader_cls=PyPDFLoader) all_docs = loader.load() print(f"Loaded {len(all_docs)} pages from PDF files") # Create a mapping of merged document chunks back to original pages page_map = {} current_index = 0 # For source tracking, we'll store page information before merging docs_by_source = defaultdict(list) # Group documents by their source file for doc in all_docs: source = doc.metadata.get("source", "") docs_by_source[source].append(doc) # Merge pages from the same PDF but track page ranges merged_docs = [] for source, source_docs in docs_by_source.items(): # Sort by page number if available source_docs.sort(key=lambda x: x.metadata.get("page", 0)) # Merge the content merged_content = "" page_ranges = [] current_pos = 0 for doc in source_docs: # Get the page number (1-indexed for human readability) page_num = doc.metadata.get("page", 0) + 1 # Add a separator between pages for clarity if merged_content: merged_content += "\n\n" # Record where this page's content starts in the merged document start_pos = len(merged_content) merged_content += doc.page_content end_pos = len(merged_content) # Store the mapping of character ranges to original page numbers page_ranges.append({ "start": start_pos, "end": end_pos, "page": page_num, "source": source }) # Create merged metadata that includes page mapping information merged_metadata = { "source": source, "title": source.split("/")[-1], "page_count": len(source_docs), "merged": True, "page_ranges": page_ranges # Store the page ranges for later reference } # Create a new document with the merged content merged_doc = Document(page_content=merged_content, metadata=merged_metadata) merged_docs.append(merged_doc) print(f"Created {len(merged_docs)} merged documents") # tiktoken_len counts tokens (not characters) using the gpt-4o-mini tokenizer def tiktoken_len(text): tokens = tiktoken.encoding_for_model("gpt-4o-mini").encode( text, ) return len(tokens) # Process splits to add page info based on character position def add_page_info_to_splits(splits): for split in splits: # Get the start position of this chunk start_pos = split.metadata.get("start_index", 0) end_pos = start_pos + len(split.page_content) # Find which page this chunk belongs to if "page_ranges" in split.metadata: for page_range in split.metadata["page_ranges"]: # If chunk significantly overlaps with this page range if (start_pos <= page_range["end"] and end_pos >= page_range["start"]): # Use this page number split.metadata["page"] = page_range["page"] break return splits # Split the text with start index tracking print("Splitting documents into chunks...") text_splitter = RecursiveCharacterTextSplitter( chunk_size=300, chunk_overlap=50, length_function=tiktoken_len, add_start_index=True ) # Split and then process to add page information raw_splits = text_splitter.split_documents(merged_docs) split_chunks = add_page_info_to_splits(raw_splits) print(f"Created {len(split_chunks)} document chunks") return split_chunks def create_and_save_vectorstore(chunks): """ Create a vector store from document chunks and save it to disk. """ print("Creating embeddings and vector store...") embedding_model = OpenAIEmbeddings(model="text-embedding-3-small") # Extract text and metadata for separate processing texts = [doc.page_content for doc in chunks] metadatas = [doc.metadata for doc in chunks] # Ensure the directory exists QDRANT_DIR.mkdir(exist_ok=True, parents=True) # Create a local Qdrant client client = QdrantClient(path=str(QDRANT_DIR)) # Get the embedding dimension sample_embedding = embedding_model.embed_query("Sample text") # Create the collection if it doesn't exist collection_name = "kohavi_ab_testing_pdf_collection" try: collection_info = client.get_collection(collection_name) print(f"Collection {collection_name} already exists") except Exception: # Collection doesn't exist, create it print(f"Creating collection {collection_name}") client.create_collection( collection_name=collection_name, vectors_config=VectorParams( size=len(sample_embedding), distance=Distance.COSINE ) ) # Process in batches to avoid memory issues batch_size = 100 print(f"Processing {len(texts)} documents in batches of {batch_size}") for i in range(0, len(texts), batch_size): batch_texts = texts[i:i+batch_size] batch_metadatas = metadatas[i:i+batch_size] print(f"Processing batch {i//batch_size + 1}/{(len(texts) + batch_size - 1)//batch_size}") # Get embeddings for this batch embeddings = embedding_model.embed_documents(batch_texts) # Create points for this batch points = [] for j, (text, embedding, metadata) in enumerate(zip(batch_texts, embeddings, batch_metadatas)): points.append(PointStruct( id=i + j, vector=embedding, payload={ "text": text, "metadata": metadata } )) # Upsert points into the collection client.upsert( collection_name=collection_name, points=points ) print(f"Vector store created and saved to {QDRANT_DIR}") return True def main(): # Load and process documents print("Starting pre-processing of PDF files...") chunks = load_and_process_documents() # Save chunks to disk print(f"Saving {len(chunks)} document chunks to {CHUNKS_FILE}...") with open(CHUNKS_FILE, 'wb') as f: pickle.dump(chunks, f) print(f"Chunks saved to {CHUNKS_FILE}") # Create and save vector store success = create_and_save_vectorstore(chunks) if success: print("Pre-processing complete! The application can now use these pre-processed files.") print(f"- Document chunks: {CHUNKS_FILE}") print(f"- Vector store: {QDRANT_DIR}") else: print("Error creating vector store. Please check the logs.") if __name__ == "__main__": main()