Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Pre-processes PDF files in the data directory, creating and saving: | |
| 1. Document chunks with metadata | |
| 2. Vector embeddings | |
| This allows the app to load pre-processed data directly instead of processing | |
| PDFs at runtime, making the app start faster and eliminating the need to | |
| upload PDFs to Hugging Face. | |
| """ | |
| import os | |
| import pickle | |
| import tiktoken | |
| from pathlib import Path | |
| from collections import defaultdict | |
| import json | |
| # Import required LangChain modules | |
| from langchain_community.document_loaders import DirectoryLoader | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_openai.embeddings import OpenAIEmbeddings | |
| from langchain_qdrant import Qdrant | |
| from langchain_core.documents import Document | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.models import Distance, VectorParams, PointStruct | |
| # Set OpenAI API key | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Ensure OpenAI API key is available | |
| if not os.environ.get("OPENAI_API_KEY"): | |
| raise EnvironmentError("OPENAI_API_KEY environment variable not found. Please set it in your .env file.") | |
| # Create directories to store pre-processed data | |
| PROCESSED_DATA_DIR = Path("processed_data") | |
| PROCESSED_DATA_DIR.mkdir(exist_ok=True) | |
| CHUNKS_FILE = PROCESSED_DATA_DIR / "document_chunks.pkl" | |
| QDRANT_DIR = PROCESSED_DATA_DIR / "qdrant_vectorstore" | |
| def load_and_process_documents(): | |
| """ | |
| Load PDF documents, merge them by source, and split into chunks. | |
| This is the same process used in the notebook and app.py. | |
| """ | |
| print("Loading PDF documents...") | |
| path = "data/" | |
| loader = DirectoryLoader(path, glob="*.pdf", loader_cls=PyPDFLoader) | |
| all_docs = loader.load() | |
| print(f"Loaded {len(all_docs)} pages from PDF files") | |
| # Create a mapping of merged document chunks back to original pages | |
| page_map = {} | |
| current_index = 0 | |
| # For source tracking, we'll store page information before merging | |
| docs_by_source = defaultdict(list) | |
| # Group documents by their source file | |
| for doc in all_docs: | |
| source = doc.metadata.get("source", "") | |
| docs_by_source[source].append(doc) | |
| # Merge pages from the same PDF but track page ranges | |
| merged_docs = [] | |
| for source, source_docs in docs_by_source.items(): | |
| # Sort by page number if available | |
| source_docs.sort(key=lambda x: x.metadata.get("page", 0)) | |
| # Merge the content | |
| merged_content = "" | |
| page_ranges = [] | |
| current_pos = 0 | |
| for doc in source_docs: | |
| # Get the page number (1-indexed for human readability) | |
| page_num = doc.metadata.get("page", 0) + 1 | |
| # Add a separator between pages for clarity | |
| if merged_content: | |
| merged_content += "\n\n" | |
| # Record where this page's content starts in the merged document | |
| start_pos = len(merged_content) | |
| merged_content += doc.page_content | |
| end_pos = len(merged_content) | |
| # Store the mapping of character ranges to original page numbers | |
| page_ranges.append({ | |
| "start": start_pos, | |
| "end": end_pos, | |
| "page": page_num, | |
| "source": source | |
| }) | |
| # Create merged metadata that includes page mapping information | |
| merged_metadata = { | |
| "source": source, | |
| "title": source.split("/")[-1], | |
| "page_count": len(source_docs), | |
| "merged": True, | |
| "page_ranges": page_ranges # Store the page ranges for later reference | |
| } | |
| # Create a new document with the merged content | |
| merged_doc = Document(page_content=merged_content, metadata=merged_metadata) | |
| merged_docs.append(merged_doc) | |
| print(f"Created {len(merged_docs)} merged documents") | |
| # tiktoken_len counts tokens (not characters) using the gpt-4o-mini tokenizer | |
| def tiktoken_len(text): | |
| tokens = tiktoken.encoding_for_model("gpt-4o-mini").encode( | |
| text, | |
| ) | |
| return len(tokens) | |
| # Process splits to add page info based on character position | |
| def add_page_info_to_splits(splits): | |
| for split in splits: | |
| # Get the start position of this chunk | |
| start_pos = split.metadata.get("start_index", 0) | |
| end_pos = start_pos + len(split.page_content) | |
| # Find which page this chunk belongs to | |
| if "page_ranges" in split.metadata: | |
| for page_range in split.metadata["page_ranges"]: | |
| # If chunk significantly overlaps with this page range | |
| if (start_pos <= page_range["end"] and | |
| end_pos >= page_range["start"]): | |
| # Use this page number | |
| split.metadata["page"] = page_range["page"] | |
| break | |
| return splits | |
| # Split the text with start index tracking | |
| print("Splitting documents into chunks...") | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=300, | |
| chunk_overlap=50, | |
| length_function=tiktoken_len, | |
| add_start_index=True | |
| ) | |
| # Split and then process to add page information | |
| raw_splits = text_splitter.split_documents(merged_docs) | |
| split_chunks = add_page_info_to_splits(raw_splits) | |
| print(f"Created {len(split_chunks)} document chunks") | |
| return split_chunks | |
| def create_and_save_vectorstore(chunks): | |
| """ | |
| Create a vector store from document chunks and save it to disk. | |
| """ | |
| print("Creating embeddings and vector store...") | |
| embedding_model = OpenAIEmbeddings(model="text-embedding-3-small") | |
| # Extract text and metadata for separate processing | |
| texts = [doc.page_content for doc in chunks] | |
| metadatas = [doc.metadata for doc in chunks] | |
| # Ensure the directory exists | |
| QDRANT_DIR.mkdir(exist_ok=True, parents=True) | |
| # Create a local Qdrant client | |
| client = QdrantClient(path=str(QDRANT_DIR)) | |
| # Get the embedding dimension | |
| sample_embedding = embedding_model.embed_query("Sample text") | |
| # Create the collection if it doesn't exist | |
| collection_name = "kohavi_ab_testing_pdf_collection" | |
| try: | |
| collection_info = client.get_collection(collection_name) | |
| print(f"Collection {collection_name} already exists") | |
| except Exception: | |
| # Collection doesn't exist, create it | |
| print(f"Creating collection {collection_name}") | |
| client.create_collection( | |
| collection_name=collection_name, | |
| vectors_config=VectorParams( | |
| size=len(sample_embedding), | |
| distance=Distance.COSINE | |
| ) | |
| ) | |
| # Process in batches to avoid memory issues | |
| batch_size = 100 | |
| print(f"Processing {len(texts)} documents in batches of {batch_size}") | |
| for i in range(0, len(texts), batch_size): | |
| batch_texts = texts[i:i+batch_size] | |
| batch_metadatas = metadatas[i:i+batch_size] | |
| print(f"Processing batch {i//batch_size + 1}/{(len(texts) + batch_size - 1)//batch_size}") | |
| # Get embeddings for this batch | |
| embeddings = embedding_model.embed_documents(batch_texts) | |
| # Create points for this batch | |
| points = [] | |
| for j, (text, embedding, metadata) in enumerate(zip(batch_texts, embeddings, batch_metadatas)): | |
| points.append(PointStruct( | |
| id=i + j, | |
| vector=embedding, | |
| payload={ | |
| "text": text, | |
| "metadata": metadata | |
| } | |
| )) | |
| # Upsert points into the collection | |
| client.upsert( | |
| collection_name=collection_name, | |
| points=points | |
| ) | |
| print(f"Vector store created and saved to {QDRANT_DIR}") | |
| return True | |
| def main(): | |
| # Load and process documents | |
| print("Starting pre-processing of PDF files...") | |
| chunks = load_and_process_documents() | |
| # Save chunks to disk | |
| print(f"Saving {len(chunks)} document chunks to {CHUNKS_FILE}...") | |
| with open(CHUNKS_FILE, 'wb') as f: | |
| pickle.dump(chunks, f) | |
| print(f"Chunks saved to {CHUNKS_FILE}") | |
| # Create and save vector store | |
| success = create_and_save_vectorstore(chunks) | |
| if success: | |
| print("Pre-processing complete! The application can now use these pre-processed files.") | |
| print(f"- Document chunks: {CHUNKS_FILE}") | |
| print(f"- Vector store: {QDRANT_DIR}") | |
| else: | |
| print("Error creating vector store. Please check the logs.") | |
| if __name__ == "__main__": | |
| main() |