Spaces:
Running
Running
| import hashlib | |
| import os | |
| from glob import glob | |
| from pathlib import Path | |
| from langchain_community.document_loaders import PyPDFLoader | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from runtime_config import CHROMA_DB_DIR, KNOWLEDGE_BASE_PATH | |
| def build_vector_db( | |
| persist_directory: str | None = None, | |
| knowledge_base_path: str | None = None, | |
| ): | |
| """ | |
| Build or load a PDF-backed vector database and return a retriever. | |
| """ | |
| persist_path = Path(persist_directory).resolve() if persist_directory else CHROMA_DB_DIR | |
| knowledge_path = ( | |
| Path(knowledge_base_path).resolve() | |
| if knowledge_base_path | |
| else KNOWLEDGE_BASE_PATH | |
| ) | |
| if not knowledge_path.exists(): | |
| knowledge_path.mkdir(parents=True, exist_ok=True) | |
| print(f"Knowledge base directory '{knowledge_path}' was created but has no PDF files.") | |
| print("Place your PDF files there and restart the application.") | |
| return _empty_retriever() | |
| pdf_files = glob(os.path.join(str(knowledge_path), "**/*.pdf"), recursive=True) | |
| if not pdf_files: | |
| print(f"No PDF files found in '{knowledge_path}'.") | |
| return _empty_retriever() | |
| print(f"Found {len(pdf_files)} PDF files for the knowledge base.") | |
| all_docs = [] | |
| processed_hashes = set() | |
| for pdf_file in pdf_files: | |
| try: | |
| print(f"Loading {os.path.basename(pdf_file)}...") | |
| loader = PyPDFLoader(pdf_file) | |
| docs = loader.load() | |
| for doc in docs: | |
| content_hash = hashlib.md5(doc.page_content.encode()).hexdigest() | |
| if content_hash in processed_hashes: | |
| continue | |
| processed_hashes.add(content_hash) | |
| doc.metadata.update( | |
| { | |
| "source": os.path.basename(pdf_file), | |
| "source_path": pdf_file, | |
| "file_size": f"{os.path.getsize(pdf_file) / 1024:.1f}KB", | |
| "content_hash": content_hash, | |
| } | |
| ) | |
| all_docs.append(doc) | |
| except Exception as error: | |
| print(f"Failed to load {os.path.basename(pdf_file)}: {error}") | |
| if not all_docs: | |
| raise ValueError("No valid PDF content was successfully loaded.") | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=500, | |
| chunk_overlap=50, | |
| separators=["\n\n", "\n", ".", "!", "?", ";", ",", " ", ""], | |
| ) | |
| splits = text_splitter.split_documents(all_docs) | |
| persist_path.mkdir(parents=True, exist_ok=True) | |
| embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
| if any(persist_path.iterdir()): | |
| print(f"Loading existing vector database from {persist_path}...") | |
| vectorstore = Chroma( | |
| persist_directory=str(persist_path), | |
| embedding_function=embeddings, | |
| ) | |
| existing_sources = vectorstore.get().get("metadatas", []) | |
| existing_files = { | |
| metadata.get("source_path") | |
| for metadata in existing_sources | |
| if isinstance(metadata, dict) and metadata.get("source_path") | |
| } | |
| if any(pdf_file not in existing_files for pdf_file in pdf_files): | |
| print("New PDF files detected. Updating vector store...") | |
| vectorstore.add_documents(splits) | |
| else: | |
| print(f"Creating new vector database at {persist_path}...") | |
| vectorstore = Chroma.from_documents( | |
| documents=splits, | |
| embedding=embeddings, | |
| persist_directory=str(persist_path), | |
| ) | |
| print(f"Vector database ready at {persist_path}.") | |
| return vectorstore.as_retriever(search_kwargs={"k": 5}) | |
| def _empty_retriever(): | |
| from langchain_core.retrievers import BaseRetriever | |
| class EmptyRetriever(BaseRetriever): | |
| def _get_relevant_documents(self, query): | |
| return [] | |
| async def _aget_relevant_documents(self, query): | |
| return [] | |
| return EmptyRetriever() | |
| if __name__ == "__main__": | |
| retriever = build_vector_db() | |
| for query in ( | |
| "What is P/E ratio?", | |
| "What are the principles of value investing?", | |
| "How to analyze financial statements?", | |
| ): | |
| docs = retriever.invoke(query) | |
| print(f"\nQuery: {query}") | |
| if docs: | |
| for index, doc in enumerate(docs[:2], start=1): | |
| print( | |
| f"{index}. [{doc.metadata.get('source', 'Unknown')}] " | |
| f"{doc.page_content[:100]}..." | |
| ) | |
| else: | |
| print("No relevant content found.") | |