""" Create Vector Database with Embeddings and RAG Enhanced with OCR for scanned PDFs and Markdown support Pure LangChain implementation, config-driven, no hardcoding """ import sys from pathlib import Path sys.path.append(str(Path(__file__).resolve().parents[1])) from typing import List, Dict, Any, Optional import json import io # LangChain imports from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.vectorstores import FAISS from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.document_loaders import ( PyMuPDFLoader, TextLoader, DirectoryLoader, UnstructuredMarkdownLoader ) # OCR and image utilities try: import fitz # PyMuPDF HAS_PYMUPDF = True except ImportError: HAS_PYMUPDF = False fitz = None try: from PIL import Image HAS_PIL = True except ImportError: HAS_PIL = False Image = None try: import pytesseract HAS_TESSERACT = True except ImportError: HAS_TESSERACT = False pytesseract = None # Local imports from utils import get_utils from nlp_processor import create_nlp_processor # ============================================================================ # DOCUMENT LOADER WITH ENHANCED PDF & MARKDOWN SUPPORT # ============================================================================ class DocumentLoader: """Load documents from various sources with OCR fallback""" def __init__(self, utils): self.utils = utils self.config = utils.config self.logger = utils.logger self.file_handler = utils.file_handler # Check OCR capabilities self.ocr_enabled = self._check_ocr_capabilities() def _check_ocr_capabilities(self) -> bool: """Check if OCR is available and enabled""" if not self.config.get('ocr.enabled', False): return False if not all([HAS_PYMUPDF, HAS_PIL, HAS_TESSERACT]): missing = [] if not HAS_PYMUPDF: missing.append('PyMuPDF') if not HAS_PIL: missing.append('Pillow') if not HAS_TESSERACT: missing.append('pytesseract') self.logger.warning(f"OCR disabled: Missing dependencies: {', '.join(missing)}") self.logger.info("Install with: pip install pymupdf pillow pytesseract") return False # Check tesseract binary try: pytesseract.get_tesseract_version() self.logger.info("āœ… OCR enabled (Tesseract available)") return True except Exception as e: self.logger.warning(f"OCR disabled: Tesseract not found. Install: brew install tesseract (Mac) or apt-get install tesseract-ocr (Linux)") return False def load_pdf_files(self) -> List[Document]: """Load all PDF files with OCR fallback for scanned documents""" # Get PDF files from raw directory and subdirectories raw_path = self.config.get_path('paths', 'data', 'raw') pdf_files = [] if raw_path and raw_path.exists(): # Direct files pdf_files.extend(self.file_handler.get_files_by_extension(raw_path, ['.pdf'])) # Subdirectories for subdir in raw_path.rglob('*'): if subdir.is_dir() and subdir != raw_path: pdf_files.extend(self.file_handler.get_files_by_extension(subdir, ['.pdf'])) self.logger.info(f"šŸ“š Found {len(pdf_files)} PDF files") if not pdf_files: self.logger.warning(f"No PDF files found in {raw_path}") return [] all_docs = [] ocr_count = 0 for pdf_path in pdf_files: try: self.logger.info(f"šŸ“„ Loading: {pdf_path.name}") # Load with PyMuPDF loader = PyMuPDFLoader(str(pdf_path)) docs = loader.load() # Process each page for doc in docs: # Add metadata doc.metadata.update({ 'source': pdf_path.name, 'file_path': str(pdf_path), 'type': 'pdf' }) # Extract Vivekananda-specific metadata self._add_vivekananda_metadata(doc, pdf_path.name) # OCR fallback for scanned/empty pages if self._should_apply_ocr(doc): try: ocr_text = self._ocr_page(pdf_path, doc.metadata.get('page', 0)) if ocr_text and len(ocr_text.strip()) > 50: doc.page_content = ocr_text doc.metadata['ocr'] = True ocr_count += 1 self.logger.info(f" āœ“ OCR applied to page {doc.metadata.get('page', '?')}") except Exception as ocr_err: self.logger.warning(f" āš ļø OCR failed on page {doc.metadata.get('page', '?')}: {ocr_err}") all_docs.extend(docs) self.logger.info(f" āœ… Loaded {len(docs)} pages") except Exception as e: self.logger.error(f" āŒ Failed to load {pdf_path.name}: {e}") continue if ocr_count > 0: self.logger.info(f"šŸ“ø OCR applied to {ocr_count} pages") return all_docs def _should_apply_ocr(self, doc: Document) -> bool: """Determine if OCR should be applied""" if not self.ocr_enabled: return False # Check if already OCR'd if doc.metadata.get('ocr', False): return False # Get minimum text length threshold from config min_length = self.config.get('ocr.min_text_length', 50) # Apply OCR if content is too short (likely scanned) content_length = len(doc.page_content.strip()) if doc.page_content else 0 return content_length < min_length def _ocr_page(self, pdf_path: Path, page_index: int) -> str: """OCR a single PDF page using PyMuPDF and Tesseract""" if not all([HAS_PYMUPDF, HAS_PIL, HAS_TESSERACT]): raise RuntimeError("OCR dependencies not available") # Get OCR config lang = self.config.get('ocr.lang', 'eng') dpi = self.config.get('ocr.dpi', 300) # Set tesseract path if configured tesseract_cmd = self.config.get('ocr.tesseract_cmd') if tesseract_cmd: pytesseract.pytesseract.tesseract_cmd = tesseract_cmd try: # Open PDF and get page with fitz.open(str(pdf_path)) as pdf_doc: if page_index < 0 or page_index >= pdf_doc.page_count: # OCR all pages texts = [] for i in range(pdf_doc.page_count): page = pdf_doc.load_page(i) pix = page.get_pixmap(dpi=dpi) img = Image.open(io.BytesIO(pix.tobytes("png"))) text = pytesseract.image_to_string(img, lang=lang) if text: texts.append(text) return "\n\n".join(texts) # OCR specific page page = pdf_doc.load_page(page_index) pix = page.get_pixmap(dpi=dpi) img = Image.open(io.BytesIO(pix.tobytes("png"))) text = pytesseract.image_to_string(img, lang=lang) return text or "" except Exception as e: self.logger.error(f"OCR error: {e}") return "" def load_text_files(self) -> List[Document]: """Load all text files""" text_extensions = self.config.get('dataset.text.file_extensions', ['.txt', '.md']) extracted_path = self.config.get_path('paths', 'data', 'extracted') if not extracted_path or not extracted_path.exists(): self.logger.warning(f"Text directory not found: {extracted_path}") return [] text_files = self.file_handler.get_files_by_extension(extracted_path, text_extensions) self.logger.info(f"šŸ“ Found {len(text_files)} text files") all_docs = [] for text_path in text_files: try: self.logger.info(f"šŸ“„ Loading: {text_path.name}") encoding = self.config.get('dataset.text.encoding', 'utf-8') loader = TextLoader(str(text_path), encoding=encoding) docs = loader.load() # Add metadata for doc in docs: doc.metadata.update({ 'source': text_path.name, 'file_path': str(text_path), 'type': 'text' }) self._add_vivekananda_metadata(doc, text_path.name) all_docs.extend(docs) self.logger.info(f" āœ… Loaded {len(docs)} document(s)") except Exception as e: self.logger.error(f" āŒ Failed to load {text_path.name}: {e}") return all_docs def load_markdown_files(self) -> List[Document]: """Load markdown files with proper parsing""" # Check multiple possible markdown locations markdown_paths = [ self.config.get_path('paths', 'data', 'markdown'), self.config.get_path('paths', 'data', 'raw'), self.config.get_path('paths', 'data', 'extracted') ] all_docs = [] for md_path in markdown_paths: if not md_path or not md_path.exists(): continue # Find all .md files md_files = list(md_path.rglob('*.md')) if not md_files: continue self.logger.info(f"šŸ“ Found {len(md_files)} markdown files in {md_path.name}") for md_file in md_files: try: self.logger.info(f"šŸ“„ Loading: {md_file.name}") # Try UnstructuredMarkdownLoader first try: loader = UnstructuredMarkdownLoader(str(md_file)) docs = loader.load() except: # Fallback to TextLoader encoding = self.config.get('dataset.text.encoding', 'utf-8') loader = TextLoader(str(md_file), encoding=encoding) docs = loader.load() # Add metadata for doc in docs: doc.metadata.update({ 'source': md_file.name, 'file_path': str(md_file), 'type': 'markdown' }) self._add_vivekananda_metadata(doc, md_file.name) all_docs.extend(docs) self.logger.info(f" āœ… Loaded {len(docs)} document(s)") except Exception as e: self.logger.error(f" āŒ Failed to load {md_file.name}: {e}") if all_docs: self.logger.info(f"šŸ“š Total markdown documents: {len(all_docs)}") else: self.logger.warning("No markdown files found") return all_docs def load_json_dataset(self) -> List[Document]: """Load Q&A pairs from JSON as documents""" json_file = self.config.get('dataset.json.file') processed_path = self.config.get_path('paths', 'data', 'processed') json_path = processed_path / json_file if not json_path.exists(): self.logger.warning(f"JSON dataset not found: {json_path}") return [] self.logger.info(f"šŸ“Š Loading JSON dataset: {json_path.name}") data = self.file_handler.load_json(json_path) if not data: self.logger.error(f"Failed to load or empty JSON file: {json_path}") return [] # Get field names from config fields = self.config.get('dataset.json.fields', {}) instruction_field = fields.get('instruction', 'instruction') response_field = fields.get('response', 'response') source_field = fields.get('source', 'source') work_type_field = fields.get('work_type', 'work_type') topic_field = fields.get('topic', 'topic') docs = [] for idx, item in enumerate(data): try: # Create document from Q&A pair instruction = item.get(instruction_field, '') response = item.get(response_field, '') if not instruction or not response: self.logger.warning(f" Skipping item {idx}: missing instruction or response") continue # Combine as context content = f"Question: {instruction}\n\nAnswer: {response}" doc = Document( page_content=content, metadata={ 'instruction': instruction, 'response': response, 'source': item.get(source_field, 'JSON Dataset'), 'work_type': item.get(work_type_field, 'Q&A'), 'topic': item.get(topic_field, 'general'), 'type': 'qa_pair', 'index': idx } ) docs.append(doc) except Exception as e: self.logger.warning(f" Error processing item {idx}: {e}") continue self.logger.info(f" āœ… Loaded {len(docs)} Q&A pairs") return docs def _add_vivekananda_metadata(self, doc: Document, filename: str): """Extract Vivekananda-specific metadata from filename and content""" import re filename_lower = filename.lower() text_sample = doc.page_content[:1000].lower() if doc.page_content else "" # Detect volume number volume_match = re.search(r'volume[_\s-]?(\d+)', filename_lower) if volume_match: doc.metadata['volume'] = int(volume_match.group(1)) # Detect work type work_type_map = { 'karma': 'Karma-Yoga', 'raja': 'Raja-Yoga', 'bhakti': 'Bhakti-Yoga', 'jnana': 'Jnana-Yoga', 'letter': 'Letters', 'speech': 'Speeches', 'address': 'Speeches', 'inspired': 'Inspired Talks', 'talk': 'Talks', 'lecture': 'Lectures' } for key, value in work_type_map.items(): if key in filename_lower or key in text_sample: doc.metadata['work_type'] = value break else: if 'work_type' not in doc.metadata: doc.metadata['work_type'] = 'General' # Detect special content special_keywords = { 'chicago': 'Chicago Address', 'parliament': 'Parliament Speech', 'colombo': 'Colombo to Almora', 'almora': 'Colombo to Almora' } for keyword, special_name in special_keywords.items(): if keyword in text_sample: doc.metadata['special'] = special_name break def load_all_documents(self) -> List[Document]: """Load all documents from all configured sources""" all_docs = [] # Get load configuration load_cfg = self.config.get('dataset.load', { 'pdf': True, 'text': True, 'markdown': True, 'json': True }) # Load PDFs if load_cfg.get('pdf', True): self.logger.info("\n" + "="*60) self.logger.info("LOADING PDF FILES") self.logger.info("="*60) pdf_docs = self.load_pdf_files() all_docs.extend(pdf_docs) # Load text files if load_cfg.get('text', True): self.logger.info("\n" + "="*60) self.logger.info("LOADING TEXT FILES") self.logger.info("="*60) text_docs = self.load_text_files() all_docs.extend(text_docs) # Load markdown files if load_cfg.get('markdown', True): self.logger.info("\n" + "="*60) self.logger.info("LOADING MARKDOWN FILES") self.logger.info("="*60) markdown_docs = self.load_markdown_files() all_docs.extend(markdown_docs) # Load JSON dataset if load_cfg.get('json', True): self.logger.info("\n" + "="*60) self.logger.info("LOADING JSON DATASET") self.logger.info("="*60) json_docs = self.load_json_dataset() all_docs.extend(json_docs) return all_docs # ============================================================================ # TEXT PROCESSOR # ============================================================================ class TextProcessor: """Process and chunk documents with NLP""" def __init__(self, utils, nlp_processor): self.utils = utils self.config = utils.config self.logger = utils.logger self.nlp_processor = nlp_processor def process_documents(self, documents: List[Document]) -> List[Document]: """Process documents with NLP pipeline""" self.logger.info("Processing documents with NLP...") processed_docs = [] for idx, doc in enumerate(documents): try: # Preprocess text processed_text = self.nlp_processor.preprocess_text(doc.page_content) # Create new document with processed text processed_doc = Document( page_content=processed_text, metadata=doc.metadata.copy() ) processed_docs.append(processed_doc) except Exception as e: self.logger.warning(f"Failed to process document {idx}: {e}") # Keep original if processing fails processed_docs.append(doc) self.logger.info(f"āœ… Processed {len(processed_docs)} documents") return processed_docs def chunk_documents(self, documents: List[Document]) -> List[Document]: """Chunk documents using config-driven RecursiveCharacterTextSplitter""" self.logger.info("Chunking documents...") # Get chunking config chunk_size = self.config.get('embeddings.chunk.size', 500) chunk_overlap = self.config.get('embeddings.chunk.overlap', 50) separators = self.config.get('embeddings.chunk.separators', ["\n\n", "\n", ". ", "! ", "? ", "; ", " ", ""]) # Create text splitter text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=len, separators=separators, keep_separator=True ) # Split documents chunks = text_splitter.split_documents(documents) # Add chunk metadata for idx, chunk in enumerate(chunks): chunk.metadata['chunk_index'] = idx chunk.metadata['char_count'] = len(chunk.page_content) chunk.metadata['word_count'] = len(chunk.page_content.split()) self.logger.info(f"āœ… Created {len(chunks)} chunks") return chunks # ============================================================================ # EMBEDDING CREATOR # ============================================================================ class EmbeddingCreator: """Create embeddings and vector store using LangChain""" def __init__(self, utils): self.utils = utils self.config = utils.config self.logger = utils.logger self.device = utils.device_manager.device def create_embeddings_model(self) -> Optional[HuggingFaceEmbeddings]: """Create embeddings model from config""" # Check if embeddings are enabled if not self.config.get('embeddings.use_hf', True): self.logger.warning("HuggingFace embeddings disabled in config") return None model_name = self.config.get('embeddings.model_name') normalize = self.config.get('embeddings.normalize', True) batch_size = self.config.get('embeddings.batch_size', 32) self.logger.info(f"Creating embeddings model: {model_name}") self.logger.info(f"Device: {self.device}") try: # Create embeddings embeddings = HuggingFaceEmbeddings( model_name=model_name, model_kwargs={'device': self.device}, encode_kwargs={ 'batch_size': batch_size, 'normalize_embeddings': normalize } ) self.logger.info("āœ… Embeddings model created") return embeddings except Exception as e: self.logger.error(f"Failed to create embeddings model: {e}") return None def create_fallback_tfidf_index(self, chunks: List[Document]): """Create minimal TF-IDF artifacts for retrieval-only fallback""" texts = [c.page_content for c in chunks if c.page_content] root = self.config.get_path('paths', 'vectorstore', 'root') fallback_dir = root / 'faiss_index' fallback_dir.mkdir(parents=True, exist_ok=True) with open(fallback_dir / 'texts.json', 'w', encoding='utf-8') as f: json.dump(texts, f, ensure_ascii=False) self.logger.info(f"āœ… Saved TF-IDF fallback texts: {fallback_dir}") return fallback_dir def create_vector_store(self, chunks: List[Document], embeddings: HuggingFaceEmbeddings) -> FAISS: """Create FAISS vector store from chunks""" self.logger.info("Creating FAISS vector store...") try: # Create vector store vectorstore = FAISS.from_documents( documents=chunks, embedding=embeddings ) self.logger.info(f"āœ… Vector store created with {vectorstore.index.ntotal:,} vectors") return vectorstore except Exception as e: self.logger.error(f"Failed to create vector store: {e}") raise def save_vector_store(self, vectorstore: FAISS): """Save vector store to disk""" save_path = self.config.get_path('paths', 'vectorstore', 'root') / self.config.get('paths.vectorstore.db_name') save_path.mkdir(parents=True, exist_ok=True) self.logger.info(f"Saving vector store to: {save_path}") try: vectorstore.save_local(str(save_path)) self.logger.info("āœ… Vector store saved successfully") except Exception as e: self.logger.error(f"Failed to save vector store: {e}") raise # ============================================================================ # VERIFICATION # ============================================================================ class VectorStoreVerifier: """Verify vector store quality""" def __init__(self, utils): self.utils = utils self.config = utils.config self.logger = utils.logger def verify(self, vectorstore: FAISS) -> bool: """Verify vector store with test queries""" self.logger.info("\n" + "="*60) self.logger.info("VERIFICATION") self.logger.info("="*60) # Check size total_vectors = vectorstore.index.ntotal self.logger.info(f"Total vectors: {total_vectors:,}") if total_vectors == 0: self.logger.error("āŒ Vector store is empty!") return False # Get test queries from config test_queries = self.config.get('evaluation.test_queries', [ "What is Karma Yoga?", "How can I overcome fear?", "What is the purpose of meditation?" ]) # Test retrieval top_k = self.config.get('rag.retrieval.top_k', 5) success = True for query in test_queries: self.logger.info(f"\nšŸ” Test query: '{query}'") try: results = vectorstore.similarity_search_with_score(query, k=top_k) if results: doc, score = results[0] self.logger.info(f" āœ… Best match score: {score:.4f}") self.logger.info(f" šŸ“– Source: {doc.metadata.get('source', 'Unknown')}") self.logger.info(f" šŸ“„ Preview: {doc.page_content[:100]}...") else: self.logger.warning(f" āš ļø No results found") success = False except Exception as e: self.logger.error(f" āŒ Query failed: {e}") success = False if success: self.logger.info("\nāœ… Verification completed successfully") else: self.logger.warning("\nāš ļø Verification completed with warnings") return success # ============================================================================ # STATISTICS # ============================================================================ class DatasetStatistics: """Calculate and display dataset statistics""" def __init__(self, utils): self.utils = utils self.logger = utils.logger def calculate(self, documents: List[Document], chunks: List[Document]): """Calculate comprehensive statistics""" self.logger.info("\n" + "="*70) self.logger.info("DATASET STATISTICS") self.logger.info("="*70) # Document statistics doc_types = {} work_types = {} total_chars = 0 ocr_count = 0 for doc in documents: doc_type = doc.metadata.get('type', 'unknown') doc_types[doc_type] = doc_types.get(doc_type, 0) + 1 work_type = doc.metadata.get('work_type', 'unknown') work_types[work_type] = work_types.get(work_type, 0) + 1 total_chars += len(doc.page_content) if doc.page_content else 0 if doc.metadata.get('ocr', False): ocr_count += 1 self.logger.info(f"\nšŸ“Š Documents:") self.logger.info(f" Total: {len(documents)}") self.logger.info(f" By type:") for dtype, count in sorted(doc_types.items()): self.logger.info(f" - {dtype}: {count}") if ocr_count > 0: self.logger.info(f" OCR applied: {ocr_count} pages") self.logger.info(f"\nšŸ“š Work types:") for wtype, count in sorted(work_types.items()): self.logger.info(f" - {wtype}: {count}") self.logger.info(f"\nšŸ“ Content:") self.logger.info(f" Total characters: {total_chars:,}") self.logger.info(f" Estimated words: {total_chars//5:,}") # Chunk statistics self.logger.info(f"\n🧩 Chunks:") self.logger.info(f" Total chunks: {len(chunks)}") if chunks: chunk_sizes = [len(chunk.page_content) for chunk in chunks if chunk.page_content] if chunk_sizes: avg_size = sum(chunk_sizes) / len(chunk_sizes) self.logger.info(f" Average chunk size: {avg_size:.0f} characters") self.logger.info(f" Min chunk size: {min(chunk_sizes)}") self.logger.info(f" Max chunk size: {max(chunk_sizes)}") # ============================================================================ # MAIN EXECUTION # ============================================================================ def main(): """Main execution flow""" # Initialize utilities utils = get_utils() logger = utils.logger logger.info("="*70) logger.info("šŸ•‰ļø VIVEKANANDA AI - VECTOR DATABASE CREATION") logger.info("="*70) try: # Initialize components logger.info("\nšŸ”§ Initializing components...") nlp_processor = create_nlp_processor(utils.config, logger) document_loader = DocumentLoader(utils) text_processor = TextProcessor(utils, nlp_processor) embedding_creator = EmbeddingCreator(utils) verifier = VectorStoreVerifier(utils) stats = DatasetStatistics(utils) # Step 1: Load documents logger.info("\n" + "="*70) logger.info("STEP 1: LOADING DOCUMENTS") logger.info("="*70) documents = document_loader.load_all_documents() if not documents: logger.error("\nāŒ No documents loaded! Please add files to data directory.") logger.info("\nšŸ“ Expected locations:") logger.info(f" - PDFs: {utils.config.get_path('paths', 'data', 'raw')}") logger.info(f" - Text: {utils.config.get_path('paths', 'data', 'extracted')}") logger.info(f" - Markdown: {utils.config.get_path('paths', 'data', 'markdown')}") logger.info(f" - JSON: {utils.config.get_path('paths', 'data', 'processed')}/{utils.config.get('dataset.json.file')}") return 1 logger.info(f"\nāœ… Loaded {len(documents)} total documents") # Step 2: Process documents logger.info("\n" + "="*70) logger.info("STEP 2: NLP PROCESSING") logger.info("="*70) processed_docs = text_processor.process_documents(documents) # Step 3: Chunk documents logger.info("\n" + "="*70) logger.info("STEP 3: CHUNKING") logger.info("="*70) chunks = text_processor.chunk_documents(processed_docs) if not chunks: logger.error("\nāŒ No chunks created!") return 1 # Show statistics stats.calculate(documents, chunks) # Step 4: Create embeddings logger.info("\n" + "="*70) logger.info("STEP 4: CREATING EMBEDDINGS") logger.info("="*70) embeddings_model = embedding_creator.create_embeddings_model() if embeddings_model is None: logger.warning("āš ļø Embeddings model unavailable. Creating TF-IDF fallback artifacts for retrieval.") embedding_creator.create_fallback_tfidf_index(chunks) vectorstore = None else: vectorstore = embedding_creator.create_vector_store(chunks, embeddings_model) # Step 5: Save vector store logger.info("\n" + "="*70) logger.info("STEP 5: SAVING") logger.info("="*70) if vectorstore is not None: embedding_creator.save_vector_store(vectorstore) # Step 6: Verify success = True if vectorstore is not None: success = verifier.verify(vectorstore) # Final summary logger.info("\n" + "="*70) if success: logger.info("āœ… SUCCESS! VECTOR DATABASE READY") else: logger.info("āš ļø COMPLETED WITH WARNINGS") logger.info("="*70) logger.info(f"\nšŸ“Š Summary:") logger.info(f" Total documents: {len(documents)}") logger.info(f" Total chunks: {len(chunks)}") logger.info(f" Vector store: {utils.config.get_path('paths', 'vectorstore', 'root')} (or TF-IDF fallback texts)") logger.info(f"\nšŸš€ Next steps:") logger.info(f" 1. Test retrieval: python scripts/02_query_rag.py") logger.info(f" 2. Test model: python scripts/03_test_mistral.py") logger.info(f" 3. Run Streamlit: streamlit run app.py") return 0 if success else 1 except KeyboardInterrupt: logger.warning("\nāš ļø Interrupted by user") return 1 except Exception as e: logger.error(f"\nāŒ FATAL ERROR: {e}", exc_info=True) return 1 finally: # Cleanup utils.optimize_memory() if __name__ == "__main__": sys.exit(main())