| """ |
| Create Vector Database with Embeddings and RAG |
| Enhanced with OCR for scanned PDFs and Markdown support |
| Pure LangChain implementation, config-driven, no hardcoding |
| """ |
|
|
| import sys |
| from pathlib import Path |
| sys.path.append(str(Path(__file__).resolve().parents[1])) |
| from typing import List, Dict, Any, Optional |
| import json |
| import io |
|
|
| |
| from langchain_core.documents import Document |
| from langchain_text_splitters import RecursiveCharacterTextSplitter |
| from langchain_community.vectorstores import FAISS |
| from langchain_huggingface import HuggingFaceEmbeddings |
| from langchain_community.document_loaders import ( |
| PyMuPDFLoader, |
| TextLoader, |
| DirectoryLoader, |
| UnstructuredMarkdownLoader |
| ) |
|
|
| |
| try: |
| import fitz |
| HAS_PYMUPDF = True |
| except ImportError: |
| HAS_PYMUPDF = False |
| fitz = None |
|
|
| try: |
| from PIL import Image |
| HAS_PIL = True |
| except ImportError: |
| HAS_PIL = False |
| Image = None |
|
|
| try: |
| import pytesseract |
| HAS_TESSERACT = True |
| except ImportError: |
| HAS_TESSERACT = False |
| pytesseract = None |
|
|
| |
| from utils import get_utils |
| from nlp_processor import create_nlp_processor |
|
|
| |
| |
| |
|
|
| class DocumentLoader: |
| """Load documents from various sources with OCR fallback""" |
| |
| def __init__(self, utils): |
| self.utils = utils |
| self.config = utils.config |
| self.logger = utils.logger |
| self.file_handler = utils.file_handler |
| |
| |
| self.ocr_enabled = self._check_ocr_capabilities() |
| |
| def _check_ocr_capabilities(self) -> bool: |
| """Check if OCR is available and enabled""" |
| if not self.config.get('ocr.enabled', False): |
| return False |
| |
| if not all([HAS_PYMUPDF, HAS_PIL, HAS_TESSERACT]): |
| missing = [] |
| if not HAS_PYMUPDF: |
| missing.append('PyMuPDF') |
| if not HAS_PIL: |
| missing.append('Pillow') |
| if not HAS_TESSERACT: |
| missing.append('pytesseract') |
| |
| self.logger.warning(f"OCR disabled: Missing dependencies: {', '.join(missing)}") |
| self.logger.info("Install with: pip install pymupdf pillow pytesseract") |
| return False |
| |
| |
| try: |
| pytesseract.get_tesseract_version() |
| self.logger.info("โ
OCR enabled (Tesseract available)") |
| return True |
| except Exception as e: |
| self.logger.warning(f"OCR disabled: Tesseract not found. Install: brew install tesseract (Mac) or apt-get install tesseract-ocr (Linux)") |
| return False |
| |
| def load_pdf_files(self) -> List[Document]: |
| """Load all PDF files with OCR fallback for scanned documents""" |
| |
| raw_path = self.config.get_path('paths', 'data', 'raw') |
| pdf_files = [] |
| |
| if raw_path and raw_path.exists(): |
| |
| pdf_files.extend(self.file_handler.get_files_by_extension(raw_path, ['.pdf'])) |
| |
| |
| for subdir in raw_path.rglob('*'): |
| if subdir.is_dir() and subdir != raw_path: |
| pdf_files.extend(self.file_handler.get_files_by_extension(subdir, ['.pdf'])) |
| |
| self.logger.info(f"๐ Found {len(pdf_files)} PDF files") |
| |
| if not pdf_files: |
| self.logger.warning(f"No PDF files found in {raw_path}") |
| return [] |
| |
| all_docs = [] |
| ocr_count = 0 |
| |
| for pdf_path in pdf_files: |
| try: |
| self.logger.info(f"๐ Loading: {pdf_path.name}") |
| |
| |
| loader = PyMuPDFLoader(str(pdf_path)) |
| docs = loader.load() |
| |
| |
| for doc in docs: |
| |
| doc.metadata.update({ |
| 'source': pdf_path.name, |
| 'file_path': str(pdf_path), |
| 'type': 'pdf' |
| }) |
| |
| |
| self._add_vivekananda_metadata(doc, pdf_path.name) |
| |
| |
| if self._should_apply_ocr(doc): |
| try: |
| ocr_text = self._ocr_page(pdf_path, doc.metadata.get('page', 0)) |
| if ocr_text and len(ocr_text.strip()) > 50: |
| doc.page_content = ocr_text |
| doc.metadata['ocr'] = True |
| ocr_count += 1 |
| self.logger.info(f" โ OCR applied to page {doc.metadata.get('page', '?')}") |
| except Exception as ocr_err: |
| self.logger.warning(f" โ ๏ธ OCR failed on page {doc.metadata.get('page', '?')}: {ocr_err}") |
| |
| all_docs.extend(docs) |
| self.logger.info(f" โ
Loaded {len(docs)} pages") |
| |
| except Exception as e: |
| self.logger.error(f" โ Failed to load {pdf_path.name}: {e}") |
| continue |
| |
| if ocr_count > 0: |
| self.logger.info(f"๐ธ OCR applied to {ocr_count} pages") |
| |
| return all_docs |
| |
| def _should_apply_ocr(self, doc: Document) -> bool: |
| """Determine if OCR should be applied""" |
| if not self.ocr_enabled: |
| return False |
| |
| |
| if doc.metadata.get('ocr', False): |
| return False |
| |
| |
| min_length = self.config.get('ocr.min_text_length', 50) |
| |
| |
| content_length = len(doc.page_content.strip()) if doc.page_content else 0 |
| |
| return content_length < min_length |
| |
| def _ocr_page(self, pdf_path: Path, page_index: int) -> str: |
| """OCR a single PDF page using PyMuPDF and Tesseract""" |
| if not all([HAS_PYMUPDF, HAS_PIL, HAS_TESSERACT]): |
| raise RuntimeError("OCR dependencies not available") |
| |
| |
| lang = self.config.get('ocr.lang', 'eng') |
| dpi = self.config.get('ocr.dpi', 300) |
| |
| |
| tesseract_cmd = self.config.get('ocr.tesseract_cmd') |
| if tesseract_cmd: |
| pytesseract.pytesseract.tesseract_cmd = tesseract_cmd |
| |
| try: |
| |
| with fitz.open(str(pdf_path)) as pdf_doc: |
| if page_index < 0 or page_index >= pdf_doc.page_count: |
| |
| texts = [] |
| for i in range(pdf_doc.page_count): |
| page = pdf_doc.load_page(i) |
| pix = page.get_pixmap(dpi=dpi) |
| img = Image.open(io.BytesIO(pix.tobytes("png"))) |
| text = pytesseract.image_to_string(img, lang=lang) |
| if text: |
| texts.append(text) |
| return "\n\n".join(texts) |
| |
| |
| page = pdf_doc.load_page(page_index) |
| pix = page.get_pixmap(dpi=dpi) |
| img = Image.open(io.BytesIO(pix.tobytes("png"))) |
| text = pytesseract.image_to_string(img, lang=lang) |
| return text or "" |
| |
| except Exception as e: |
| self.logger.error(f"OCR error: {e}") |
| return "" |
| |
| def load_text_files(self) -> List[Document]: |
| """Load all text files""" |
| text_extensions = self.config.get('dataset.text.file_extensions', ['.txt', '.md']) |
| extracted_path = self.config.get_path('paths', 'data', 'extracted') |
| |
| if not extracted_path or not extracted_path.exists(): |
| self.logger.warning(f"Text directory not found: {extracted_path}") |
| return [] |
| |
| text_files = self.file_handler.get_files_by_extension(extracted_path, text_extensions) |
| |
| self.logger.info(f"๐ Found {len(text_files)} text files") |
| |
| all_docs = [] |
| for text_path in text_files: |
| try: |
| self.logger.info(f"๐ Loading: {text_path.name}") |
| |
| encoding = self.config.get('dataset.text.encoding', 'utf-8') |
| loader = TextLoader(str(text_path), encoding=encoding) |
| docs = loader.load() |
| |
| |
| for doc in docs: |
| doc.metadata.update({ |
| 'source': text_path.name, |
| 'file_path': str(text_path), |
| 'type': 'text' |
| }) |
| self._add_vivekananda_metadata(doc, text_path.name) |
| |
| all_docs.extend(docs) |
| self.logger.info(f" โ
Loaded {len(docs)} document(s)") |
| |
| except Exception as e: |
| self.logger.error(f" โ Failed to load {text_path.name}: {e}") |
| |
| return all_docs |
| |
| def load_markdown_files(self) -> List[Document]: |
| """Load markdown files with proper parsing""" |
| |
| markdown_paths = [ |
| self.config.get_path('paths', 'data', 'markdown'), |
| self.config.get_path('paths', 'data', 'raw'), |
| self.config.get_path('paths', 'data', 'extracted') |
| ] |
| |
| all_docs = [] |
| |
| for md_path in markdown_paths: |
| if not md_path or not md_path.exists(): |
| continue |
| |
| |
| md_files = list(md_path.rglob('*.md')) |
| |
| if not md_files: |
| continue |
| |
| self.logger.info(f"๐ Found {len(md_files)} markdown files in {md_path.name}") |
| |
| for md_file in md_files: |
| try: |
| self.logger.info(f"๐ Loading: {md_file.name}") |
| |
| |
| try: |
| loader = UnstructuredMarkdownLoader(str(md_file)) |
| docs = loader.load() |
| except: |
| |
| encoding = self.config.get('dataset.text.encoding', 'utf-8') |
| loader = TextLoader(str(md_file), encoding=encoding) |
| docs = loader.load() |
| |
| |
| for doc in docs: |
| doc.metadata.update({ |
| 'source': md_file.name, |
| 'file_path': str(md_file), |
| 'type': 'markdown' |
| }) |
| self._add_vivekananda_metadata(doc, md_file.name) |
| |
| all_docs.extend(docs) |
| self.logger.info(f" โ
Loaded {len(docs)} document(s)") |
| |
| except Exception as e: |
| self.logger.error(f" โ Failed to load {md_file.name}: {e}") |
| |
| if all_docs: |
| self.logger.info(f"๐ Total markdown documents: {len(all_docs)}") |
| else: |
| self.logger.warning("No markdown files found") |
| |
| return all_docs |
| |
| def load_json_dataset(self) -> List[Document]: |
| """Load Q&A pairs from JSON as documents""" |
| json_file = self.config.get('dataset.json.file') |
| processed_path = self.config.get_path('paths', 'data', 'processed') |
| json_path = processed_path / json_file |
| |
| if not json_path.exists(): |
| self.logger.warning(f"JSON dataset not found: {json_path}") |
| return [] |
| |
| self.logger.info(f"๐ Loading JSON dataset: {json_path.name}") |
| |
| data = self.file_handler.load_json(json_path) |
| |
| if not data: |
| self.logger.error(f"Failed to load or empty JSON file: {json_path}") |
| return [] |
| |
| |
| fields = self.config.get('dataset.json.fields', {}) |
| instruction_field = fields.get('instruction', 'instruction') |
| response_field = fields.get('response', 'response') |
| source_field = fields.get('source', 'source') |
| work_type_field = fields.get('work_type', 'work_type') |
| topic_field = fields.get('topic', 'topic') |
| |
| docs = [] |
| for idx, item in enumerate(data): |
| try: |
| |
| instruction = item.get(instruction_field, '') |
| response = item.get(response_field, '') |
| |
| if not instruction or not response: |
| self.logger.warning(f" Skipping item {idx}: missing instruction or response") |
| continue |
| |
| |
| content = f"Question: {instruction}\n\nAnswer: {response}" |
| |
| doc = Document( |
| page_content=content, |
| metadata={ |
| 'instruction': instruction, |
| 'response': response, |
| 'source': item.get(source_field, 'JSON Dataset'), |
| 'work_type': item.get(work_type_field, 'Q&A'), |
| 'topic': item.get(topic_field, 'general'), |
| 'type': 'qa_pair', |
| 'index': idx |
| } |
| ) |
| |
| docs.append(doc) |
| |
| except Exception as e: |
| self.logger.warning(f" Error processing item {idx}: {e}") |
| continue |
| |
| self.logger.info(f" โ
Loaded {len(docs)} Q&A pairs") |
| return docs |
| |
| def _add_vivekananda_metadata(self, doc: Document, filename: str): |
| """Extract Vivekananda-specific metadata from filename and content""" |
| import re |
| |
| filename_lower = filename.lower() |
| text_sample = doc.page_content[:1000].lower() if doc.page_content else "" |
| |
| |
| volume_match = re.search(r'volume[_\s-]?(\d+)', filename_lower) |
| if volume_match: |
| doc.metadata['volume'] = int(volume_match.group(1)) |
| |
| |
| work_type_map = { |
| 'karma': 'Karma-Yoga', |
| 'raja': 'Raja-Yoga', |
| 'bhakti': 'Bhakti-Yoga', |
| 'jnana': 'Jnana-Yoga', |
| 'letter': 'Letters', |
| 'speech': 'Speeches', |
| 'address': 'Speeches', |
| 'inspired': 'Inspired Talks', |
| 'talk': 'Talks', |
| 'lecture': 'Lectures' |
| } |
| |
| for key, value in work_type_map.items(): |
| if key in filename_lower or key in text_sample: |
| doc.metadata['work_type'] = value |
| break |
| else: |
| if 'work_type' not in doc.metadata: |
| doc.metadata['work_type'] = 'General' |
| |
| |
| special_keywords = { |
| 'chicago': 'Chicago Address', |
| 'parliament': 'Parliament Speech', |
| 'colombo': 'Colombo to Almora', |
| 'almora': 'Colombo to Almora' |
| } |
| |
| for keyword, special_name in special_keywords.items(): |
| if keyword in text_sample: |
| doc.metadata['special'] = special_name |
| break |
| |
| def load_all_documents(self) -> List[Document]: |
| """Load all documents from all configured sources""" |
| all_docs = [] |
| |
| |
| load_cfg = self.config.get('dataset.load', { |
| 'pdf': True, |
| 'text': True, |
| 'markdown': True, |
| 'json': True |
| }) |
| |
| |
| if load_cfg.get('pdf', True): |
| self.logger.info("\n" + "="*60) |
| self.logger.info("LOADING PDF FILES") |
| self.logger.info("="*60) |
| pdf_docs = self.load_pdf_files() |
| all_docs.extend(pdf_docs) |
| |
| |
| if load_cfg.get('text', True): |
| self.logger.info("\n" + "="*60) |
| self.logger.info("LOADING TEXT FILES") |
| self.logger.info("="*60) |
| text_docs = self.load_text_files() |
| all_docs.extend(text_docs) |
| |
| |
| if load_cfg.get('markdown', True): |
| self.logger.info("\n" + "="*60) |
| self.logger.info("LOADING MARKDOWN FILES") |
| self.logger.info("="*60) |
| markdown_docs = self.load_markdown_files() |
| all_docs.extend(markdown_docs) |
| |
| |
| if load_cfg.get('json', True): |
| self.logger.info("\n" + "="*60) |
| self.logger.info("LOADING JSON DATASET") |
| self.logger.info("="*60) |
| json_docs = self.load_json_dataset() |
| all_docs.extend(json_docs) |
| |
| return all_docs |
|
|
| |
| |
| |
|
|
| class TextProcessor: |
| """Process and chunk documents with NLP""" |
| |
| def __init__(self, utils, nlp_processor): |
| self.utils = utils |
| self.config = utils.config |
| self.logger = utils.logger |
| self.nlp_processor = nlp_processor |
| |
| def process_documents(self, documents: List[Document]) -> List[Document]: |
| """Process documents with NLP pipeline""" |
| self.logger.info("Processing documents with NLP...") |
| |
| processed_docs = [] |
| for idx, doc in enumerate(documents): |
| try: |
| |
| processed_text = self.nlp_processor.preprocess_text(doc.page_content) |
| |
| |
| processed_doc = Document( |
| page_content=processed_text, |
| metadata=doc.metadata.copy() |
| ) |
| |
| processed_docs.append(processed_doc) |
| |
| except Exception as e: |
| self.logger.warning(f"Failed to process document {idx}: {e}") |
| |
| processed_docs.append(doc) |
| |
| self.logger.info(f"โ
Processed {len(processed_docs)} documents") |
| return processed_docs |
| |
| def chunk_documents(self, documents: List[Document]) -> List[Document]: |
| """Chunk documents using config-driven RecursiveCharacterTextSplitter""" |
| self.logger.info("Chunking documents...") |
| |
| |
| chunk_size = self.config.get('embeddings.chunk.size', 500) |
| chunk_overlap = self.config.get('embeddings.chunk.overlap', 50) |
| separators = self.config.get('embeddings.chunk.separators', |
| ["\n\n", "\n", ". ", "! ", "? ", "; ", " ", ""]) |
| |
| |
| text_splitter = RecursiveCharacterTextSplitter( |
| chunk_size=chunk_size, |
| chunk_overlap=chunk_overlap, |
| length_function=len, |
| separators=separators, |
| keep_separator=True |
| ) |
| |
| |
| chunks = text_splitter.split_documents(documents) |
| |
| |
| for idx, chunk in enumerate(chunks): |
| chunk.metadata['chunk_index'] = idx |
| chunk.metadata['char_count'] = len(chunk.page_content) |
| chunk.metadata['word_count'] = len(chunk.page_content.split()) |
| |
| self.logger.info(f"โ
Created {len(chunks)} chunks") |
| return chunks |
|
|
| |
| |
| |
|
|
| class EmbeddingCreator: |
| """Create embeddings and vector store using LangChain""" |
| |
| def __init__(self, utils): |
| self.utils = utils |
| self.config = utils.config |
| self.logger = utils.logger |
| self.device = utils.device_manager.device |
| |
| def create_embeddings_model(self) -> Optional[HuggingFaceEmbeddings]: |
| """Create embeddings model from config""" |
| |
| if not self.config.get('embeddings.use_hf', True): |
| self.logger.warning("HuggingFace embeddings disabled in config") |
| return None |
| |
| model_name = self.config.get('embeddings.model_name') |
| normalize = self.config.get('embeddings.normalize', True) |
| batch_size = self.config.get('embeddings.batch_size', 32) |
| |
| self.logger.info(f"Creating embeddings model: {model_name}") |
| self.logger.info(f"Device: {self.device}") |
| |
| try: |
| |
| embeddings = HuggingFaceEmbeddings( |
| model_name=model_name, |
| model_kwargs={'device': self.device}, |
| encode_kwargs={ |
| 'batch_size': batch_size, |
| 'normalize_embeddings': normalize |
| } |
| ) |
| |
| self.logger.info("โ
Embeddings model created") |
| return embeddings |
| |
| except Exception as e: |
| self.logger.error(f"Failed to create embeddings model: {e}") |
| return None |
|
|
| def create_fallback_tfidf_index(self, chunks: List[Document]): |
| """Create minimal TF-IDF artifacts for retrieval-only fallback""" |
| texts = [c.page_content for c in chunks if c.page_content] |
| root = self.config.get_path('paths', 'vectorstore', 'root') |
| fallback_dir = root / 'faiss_index' |
| fallback_dir.mkdir(parents=True, exist_ok=True) |
| with open(fallback_dir / 'texts.json', 'w', encoding='utf-8') as f: |
| json.dump(texts, f, ensure_ascii=False) |
| self.logger.info(f"โ
Saved TF-IDF fallback texts: {fallback_dir}") |
| return fallback_dir |
| |
| def create_vector_store(self, chunks: List[Document], embeddings: HuggingFaceEmbeddings) -> FAISS: |
| """Create FAISS vector store from chunks""" |
| self.logger.info("Creating FAISS vector store...") |
| |
| try: |
| |
| vectorstore = FAISS.from_documents( |
| documents=chunks, |
| embedding=embeddings |
| ) |
| |
| self.logger.info(f"โ
Vector store created with {vectorstore.index.ntotal:,} vectors") |
| return vectorstore |
| |
| except Exception as e: |
| self.logger.error(f"Failed to create vector store: {e}") |
| raise |
| |
| def save_vector_store(self, vectorstore: FAISS): |
| """Save vector store to disk""" |
| save_path = self.config.get_path('paths', 'vectorstore', 'root') / self.config.get('paths.vectorstore.db_name') |
| save_path.mkdir(parents=True, exist_ok=True) |
| |
| self.logger.info(f"Saving vector store to: {save_path}") |
| |
| try: |
| vectorstore.save_local(str(save_path)) |
| self.logger.info("โ
Vector store saved successfully") |
| except Exception as e: |
| self.logger.error(f"Failed to save vector store: {e}") |
| raise |
|
|
| |
| |
| |
|
|
| class VectorStoreVerifier: |
| """Verify vector store quality""" |
| |
| def __init__(self, utils): |
| self.utils = utils |
| self.config = utils.config |
| self.logger = utils.logger |
| |
| def verify(self, vectorstore: FAISS) -> bool: |
| """Verify vector store with test queries""" |
| self.logger.info("\n" + "="*60) |
| self.logger.info("VERIFICATION") |
| self.logger.info("="*60) |
| |
| |
| total_vectors = vectorstore.index.ntotal |
| self.logger.info(f"Total vectors: {total_vectors:,}") |
| |
| if total_vectors == 0: |
| self.logger.error("โ Vector store is empty!") |
| return False |
| |
| |
| test_queries = self.config.get('evaluation.test_queries', [ |
| "What is Karma Yoga?", |
| "How can I overcome fear?", |
| "What is the purpose of meditation?" |
| ]) |
| |
| |
| top_k = self.config.get('rag.retrieval.top_k', 5) |
| |
| success = True |
| for query in test_queries: |
| self.logger.info(f"\n๐ Test query: '{query}'") |
| |
| try: |
| results = vectorstore.similarity_search_with_score(query, k=top_k) |
| |
| if results: |
| doc, score = results[0] |
| self.logger.info(f" โ
Best match score: {score:.4f}") |
| self.logger.info(f" ๐ Source: {doc.metadata.get('source', 'Unknown')}") |
| self.logger.info(f" ๐ Preview: {doc.page_content[:100]}...") |
| else: |
| self.logger.warning(f" โ ๏ธ No results found") |
| success = False |
| |
| except Exception as e: |
| self.logger.error(f" โ Query failed: {e}") |
| success = False |
| |
| if success: |
| self.logger.info("\nโ
Verification completed successfully") |
| else: |
| self.logger.warning("\nโ ๏ธ Verification completed with warnings") |
| |
| return success |
|
|
| |
| |
| |
|
|
| class DatasetStatistics: |
| """Calculate and display dataset statistics""" |
| |
| def __init__(self, utils): |
| self.utils = utils |
| self.logger = utils.logger |
| |
| def calculate(self, documents: List[Document], chunks: List[Document]): |
| """Calculate comprehensive statistics""" |
| self.logger.info("\n" + "="*70) |
| self.logger.info("DATASET STATISTICS") |
| self.logger.info("="*70) |
| |
| |
| doc_types = {} |
| work_types = {} |
| total_chars = 0 |
| ocr_count = 0 |
| |
| for doc in documents: |
| doc_type = doc.metadata.get('type', 'unknown') |
| doc_types[doc_type] = doc_types.get(doc_type, 0) + 1 |
| |
| work_type = doc.metadata.get('work_type', 'unknown') |
| work_types[work_type] = work_types.get(work_type, 0) + 1 |
| |
| total_chars += len(doc.page_content) if doc.page_content else 0 |
| |
| if doc.metadata.get('ocr', False): |
| ocr_count += 1 |
| |
| self.logger.info(f"\n๐ Documents:") |
| self.logger.info(f" Total: {len(documents)}") |
| self.logger.info(f" By type:") |
| for dtype, count in sorted(doc_types.items()): |
| self.logger.info(f" - {dtype}: {count}") |
| |
| if ocr_count > 0: |
| self.logger.info(f" OCR applied: {ocr_count} pages") |
| |
| self.logger.info(f"\n๐ Work types:") |
| for wtype, count in sorted(work_types.items()): |
| self.logger.info(f" - {wtype}: {count}") |
| |
| self.logger.info(f"\n๐ Content:") |
| self.logger.info(f" Total characters: {total_chars:,}") |
| self.logger.info(f" Estimated words: {total_chars//5:,}") |
| |
| |
| self.logger.info(f"\n๐งฉ Chunks:") |
| self.logger.info(f" Total chunks: {len(chunks)}") |
| |
| if chunks: |
| chunk_sizes = [len(chunk.page_content) for chunk in chunks if chunk.page_content] |
| if chunk_sizes: |
| avg_size = sum(chunk_sizes) / len(chunk_sizes) |
| self.logger.info(f" Average chunk size: {avg_size:.0f} characters") |
| self.logger.info(f" Min chunk size: {min(chunk_sizes)}") |
| self.logger.info(f" Max chunk size: {max(chunk_sizes)}") |
|
|
| |
| |
| |
|
|
| def main(): |
| """Main execution flow""" |
| |
| |
| utils = get_utils() |
| logger = utils.logger |
| |
| logger.info("="*70) |
| logger.info("๐๏ธ VIVEKANANDA AI - VECTOR DATABASE CREATION") |
| logger.info("="*70) |
| |
| try: |
| |
| logger.info("\n๐ง Initializing components...") |
| nlp_processor = create_nlp_processor(utils.config, logger) |
| document_loader = DocumentLoader(utils) |
| text_processor = TextProcessor(utils, nlp_processor) |
| embedding_creator = EmbeddingCreator(utils) |
| verifier = VectorStoreVerifier(utils) |
| stats = DatasetStatistics(utils) |
| |
| |
| logger.info("\n" + "="*70) |
| logger.info("STEP 1: LOADING DOCUMENTS") |
| logger.info("="*70) |
| |
| documents = document_loader.load_all_documents() |
| |
| if not documents: |
| logger.error("\nโ No documents loaded! Please add files to data directory.") |
| logger.info("\n๐ Expected locations:") |
| logger.info(f" - PDFs: {utils.config.get_path('paths', 'data', 'raw')}") |
| logger.info(f" - Text: {utils.config.get_path('paths', 'data', 'extracted')}") |
| logger.info(f" - Markdown: {utils.config.get_path('paths', 'data', 'markdown')}") |
| logger.info(f" - JSON: {utils.config.get_path('paths', 'data', 'processed')}/{utils.config.get('dataset.json.file')}") |
| return 1 |
| |
| logger.info(f"\nโ
Loaded {len(documents)} total documents") |
| |
| |
| logger.info("\n" + "="*70) |
| logger.info("STEP 2: NLP PROCESSING") |
| logger.info("="*70) |
| |
| processed_docs = text_processor.process_documents(documents) |
| |
| |
| logger.info("\n" + "="*70) |
| logger.info("STEP 3: CHUNKING") |
| logger.info("="*70) |
| |
| chunks = text_processor.chunk_documents(processed_docs) |
| |
| if not chunks: |
| logger.error("\nโ No chunks created!") |
| return 1 |
| |
| |
| stats.calculate(documents, chunks) |
| |
| |
| logger.info("\n" + "="*70) |
| logger.info("STEP 4: CREATING EMBEDDINGS") |
| logger.info("="*70) |
| |
| embeddings_model = embedding_creator.create_embeddings_model() |
| |
| if embeddings_model is None: |
| logger.warning("โ ๏ธ Embeddings model unavailable. Creating TF-IDF fallback artifacts for retrieval.") |
| embedding_creator.create_fallback_tfidf_index(chunks) |
| vectorstore = None |
| else: |
| vectorstore = embedding_creator.create_vector_store(chunks, embeddings_model) |
| |
| |
| logger.info("\n" + "="*70) |
| logger.info("STEP 5: SAVING") |
| logger.info("="*70) |
| |
| if vectorstore is not None: |
| embedding_creator.save_vector_store(vectorstore) |
| |
| |
| success = True |
| if vectorstore is not None: |
| success = verifier.verify(vectorstore) |
| |
| |
| logger.info("\n" + "="*70) |
| if success: |
| logger.info("โ
SUCCESS! VECTOR DATABASE READY") |
| else: |
| logger.info("โ ๏ธ COMPLETED WITH WARNINGS") |
| logger.info("="*70) |
| |
| logger.info(f"\n๐ Summary:") |
| logger.info(f" Total documents: {len(documents)}") |
| logger.info(f" Total chunks: {len(chunks)}") |
| logger.info(f" Vector store: {utils.config.get_path('paths', 'vectorstore', 'root')} (or TF-IDF fallback texts)") |
| |
| logger.info(f"\n๐ Next steps:") |
| logger.info(f" 1. Test retrieval: python scripts/02_query_rag.py") |
| logger.info(f" 2. Test model: python scripts/03_test_mistral.py") |
| logger.info(f" 3. Run Streamlit: streamlit run app.py") |
| |
| return 0 if success else 1 |
| |
| except KeyboardInterrupt: |
| logger.warning("\nโ ๏ธ Interrupted by user") |
| return 1 |
| except Exception as e: |
| logger.error(f"\nโ FATAL ERROR: {e}", exc_info=True) |
| return 1 |
| finally: |
| |
| utils.optimize_memory() |
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |