VivekanandaAI / 01_embed_data.py
jyotirmoy05's picture
Upload 18 files
3c15254 verified
"""
Create Vector Database with Embeddings and RAG
Enhanced with OCR for scanned PDFs and Markdown support
Pure LangChain implementation, config-driven, no hardcoding
"""
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parents[1]))
from typing import List, Dict, Any, Optional
import json
import io
# LangChain imports
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.document_loaders import (
PyMuPDFLoader,
TextLoader,
DirectoryLoader,
UnstructuredMarkdownLoader
)
# OCR and image utilities
try:
import fitz # PyMuPDF
HAS_PYMUPDF = True
except ImportError:
HAS_PYMUPDF = False
fitz = None
try:
from PIL import Image
HAS_PIL = True
except ImportError:
HAS_PIL = False
Image = None
try:
import pytesseract
HAS_TESSERACT = True
except ImportError:
HAS_TESSERACT = False
pytesseract = None
# Local imports
from utils import get_utils
from nlp_processor import create_nlp_processor
# ============================================================================
# DOCUMENT LOADER WITH ENHANCED PDF & MARKDOWN SUPPORT
# ============================================================================
class DocumentLoader:
"""Load documents from various sources with OCR fallback"""
def __init__(self, utils):
self.utils = utils
self.config = utils.config
self.logger = utils.logger
self.file_handler = utils.file_handler
# Check OCR capabilities
self.ocr_enabled = self._check_ocr_capabilities()
def _check_ocr_capabilities(self) -> bool:
"""Check if OCR is available and enabled"""
if not self.config.get('ocr.enabled', False):
return False
if not all([HAS_PYMUPDF, HAS_PIL, HAS_TESSERACT]):
missing = []
if not HAS_PYMUPDF:
missing.append('PyMuPDF')
if not HAS_PIL:
missing.append('Pillow')
if not HAS_TESSERACT:
missing.append('pytesseract')
self.logger.warning(f"OCR disabled: Missing dependencies: {', '.join(missing)}")
self.logger.info("Install with: pip install pymupdf pillow pytesseract")
return False
# Check tesseract binary
try:
pytesseract.get_tesseract_version()
self.logger.info("โœ… OCR enabled (Tesseract available)")
return True
except Exception as e:
self.logger.warning(f"OCR disabled: Tesseract not found. Install: brew install tesseract (Mac) or apt-get install tesseract-ocr (Linux)")
return False
def load_pdf_files(self) -> List[Document]:
"""Load all PDF files with OCR fallback for scanned documents"""
# Get PDF files from raw directory and subdirectories
raw_path = self.config.get_path('paths', 'data', 'raw')
pdf_files = []
if raw_path and raw_path.exists():
# Direct files
pdf_files.extend(self.file_handler.get_files_by_extension(raw_path, ['.pdf']))
# Subdirectories
for subdir in raw_path.rglob('*'):
if subdir.is_dir() and subdir != raw_path:
pdf_files.extend(self.file_handler.get_files_by_extension(subdir, ['.pdf']))
self.logger.info(f"๐Ÿ“š Found {len(pdf_files)} PDF files")
if not pdf_files:
self.logger.warning(f"No PDF files found in {raw_path}")
return []
all_docs = []
ocr_count = 0
for pdf_path in pdf_files:
try:
self.logger.info(f"๐Ÿ“„ Loading: {pdf_path.name}")
# Load with PyMuPDF
loader = PyMuPDFLoader(str(pdf_path))
docs = loader.load()
# Process each page
for doc in docs:
# Add metadata
doc.metadata.update({
'source': pdf_path.name,
'file_path': str(pdf_path),
'type': 'pdf'
})
# Extract Vivekananda-specific metadata
self._add_vivekananda_metadata(doc, pdf_path.name)
# OCR fallback for scanned/empty pages
if self._should_apply_ocr(doc):
try:
ocr_text = self._ocr_page(pdf_path, doc.metadata.get('page', 0))
if ocr_text and len(ocr_text.strip()) > 50:
doc.page_content = ocr_text
doc.metadata['ocr'] = True
ocr_count += 1
self.logger.info(f" โœ“ OCR applied to page {doc.metadata.get('page', '?')}")
except Exception as ocr_err:
self.logger.warning(f" โš ๏ธ OCR failed on page {doc.metadata.get('page', '?')}: {ocr_err}")
all_docs.extend(docs)
self.logger.info(f" โœ… Loaded {len(docs)} pages")
except Exception as e:
self.logger.error(f" โŒ Failed to load {pdf_path.name}: {e}")
continue
if ocr_count > 0:
self.logger.info(f"๐Ÿ“ธ OCR applied to {ocr_count} pages")
return all_docs
def _should_apply_ocr(self, doc: Document) -> bool:
"""Determine if OCR should be applied"""
if not self.ocr_enabled:
return False
# Check if already OCR'd
if doc.metadata.get('ocr', False):
return False
# Get minimum text length threshold from config
min_length = self.config.get('ocr.min_text_length', 50)
# Apply OCR if content is too short (likely scanned)
content_length = len(doc.page_content.strip()) if doc.page_content else 0
return content_length < min_length
def _ocr_page(self, pdf_path: Path, page_index: int) -> str:
"""OCR a single PDF page using PyMuPDF and Tesseract"""
if not all([HAS_PYMUPDF, HAS_PIL, HAS_TESSERACT]):
raise RuntimeError("OCR dependencies not available")
# Get OCR config
lang = self.config.get('ocr.lang', 'eng')
dpi = self.config.get('ocr.dpi', 300)
# Set tesseract path if configured
tesseract_cmd = self.config.get('ocr.tesseract_cmd')
if tesseract_cmd:
pytesseract.pytesseract.tesseract_cmd = tesseract_cmd
try:
# Open PDF and get page
with fitz.open(str(pdf_path)) as pdf_doc:
if page_index < 0 or page_index >= pdf_doc.page_count:
# OCR all pages
texts = []
for i in range(pdf_doc.page_count):
page = pdf_doc.load_page(i)
pix = page.get_pixmap(dpi=dpi)
img = Image.open(io.BytesIO(pix.tobytes("png")))
text = pytesseract.image_to_string(img, lang=lang)
if text:
texts.append(text)
return "\n\n".join(texts)
# OCR specific page
page = pdf_doc.load_page(page_index)
pix = page.get_pixmap(dpi=dpi)
img = Image.open(io.BytesIO(pix.tobytes("png")))
text = pytesseract.image_to_string(img, lang=lang)
return text or ""
except Exception as e:
self.logger.error(f"OCR error: {e}")
return ""
def load_text_files(self) -> List[Document]:
"""Load all text files"""
text_extensions = self.config.get('dataset.text.file_extensions', ['.txt', '.md'])
extracted_path = self.config.get_path('paths', 'data', 'extracted')
if not extracted_path or not extracted_path.exists():
self.logger.warning(f"Text directory not found: {extracted_path}")
return []
text_files = self.file_handler.get_files_by_extension(extracted_path, text_extensions)
self.logger.info(f"๐Ÿ“ Found {len(text_files)} text files")
all_docs = []
for text_path in text_files:
try:
self.logger.info(f"๐Ÿ“„ Loading: {text_path.name}")
encoding = self.config.get('dataset.text.encoding', 'utf-8')
loader = TextLoader(str(text_path), encoding=encoding)
docs = loader.load()
# Add metadata
for doc in docs:
doc.metadata.update({
'source': text_path.name,
'file_path': str(text_path),
'type': 'text'
})
self._add_vivekananda_metadata(doc, text_path.name)
all_docs.extend(docs)
self.logger.info(f" โœ… Loaded {len(docs)} document(s)")
except Exception as e:
self.logger.error(f" โŒ Failed to load {text_path.name}: {e}")
return all_docs
def load_markdown_files(self) -> List[Document]:
"""Load markdown files with proper parsing"""
# Check multiple possible markdown locations
markdown_paths = [
self.config.get_path('paths', 'data', 'markdown'),
self.config.get_path('paths', 'data', 'raw'),
self.config.get_path('paths', 'data', 'extracted')
]
all_docs = []
for md_path in markdown_paths:
if not md_path or not md_path.exists():
continue
# Find all .md files
md_files = list(md_path.rglob('*.md'))
if not md_files:
continue
self.logger.info(f"๐Ÿ“ Found {len(md_files)} markdown files in {md_path.name}")
for md_file in md_files:
try:
self.logger.info(f"๐Ÿ“„ Loading: {md_file.name}")
# Try UnstructuredMarkdownLoader first
try:
loader = UnstructuredMarkdownLoader(str(md_file))
docs = loader.load()
except:
# Fallback to TextLoader
encoding = self.config.get('dataset.text.encoding', 'utf-8')
loader = TextLoader(str(md_file), encoding=encoding)
docs = loader.load()
# Add metadata
for doc in docs:
doc.metadata.update({
'source': md_file.name,
'file_path': str(md_file),
'type': 'markdown'
})
self._add_vivekananda_metadata(doc, md_file.name)
all_docs.extend(docs)
self.logger.info(f" โœ… Loaded {len(docs)} document(s)")
except Exception as e:
self.logger.error(f" โŒ Failed to load {md_file.name}: {e}")
if all_docs:
self.logger.info(f"๐Ÿ“š Total markdown documents: {len(all_docs)}")
else:
self.logger.warning("No markdown files found")
return all_docs
def load_json_dataset(self) -> List[Document]:
"""Load Q&A pairs from JSON as documents"""
json_file = self.config.get('dataset.json.file')
processed_path = self.config.get_path('paths', 'data', 'processed')
json_path = processed_path / json_file
if not json_path.exists():
self.logger.warning(f"JSON dataset not found: {json_path}")
return []
self.logger.info(f"๐Ÿ“Š Loading JSON dataset: {json_path.name}")
data = self.file_handler.load_json(json_path)
if not data:
self.logger.error(f"Failed to load or empty JSON file: {json_path}")
return []
# Get field names from config
fields = self.config.get('dataset.json.fields', {})
instruction_field = fields.get('instruction', 'instruction')
response_field = fields.get('response', 'response')
source_field = fields.get('source', 'source')
work_type_field = fields.get('work_type', 'work_type')
topic_field = fields.get('topic', 'topic')
docs = []
for idx, item in enumerate(data):
try:
# Create document from Q&A pair
instruction = item.get(instruction_field, '')
response = item.get(response_field, '')
if not instruction or not response:
self.logger.warning(f" Skipping item {idx}: missing instruction or response")
continue
# Combine as context
content = f"Question: {instruction}\n\nAnswer: {response}"
doc = Document(
page_content=content,
metadata={
'instruction': instruction,
'response': response,
'source': item.get(source_field, 'JSON Dataset'),
'work_type': item.get(work_type_field, 'Q&A'),
'topic': item.get(topic_field, 'general'),
'type': 'qa_pair',
'index': idx
}
)
docs.append(doc)
except Exception as e:
self.logger.warning(f" Error processing item {idx}: {e}")
continue
self.logger.info(f" โœ… Loaded {len(docs)} Q&A pairs")
return docs
def _add_vivekananda_metadata(self, doc: Document, filename: str):
"""Extract Vivekananda-specific metadata from filename and content"""
import re
filename_lower = filename.lower()
text_sample = doc.page_content[:1000].lower() if doc.page_content else ""
# Detect volume number
volume_match = re.search(r'volume[_\s-]?(\d+)', filename_lower)
if volume_match:
doc.metadata['volume'] = int(volume_match.group(1))
# Detect work type
work_type_map = {
'karma': 'Karma-Yoga',
'raja': 'Raja-Yoga',
'bhakti': 'Bhakti-Yoga',
'jnana': 'Jnana-Yoga',
'letter': 'Letters',
'speech': 'Speeches',
'address': 'Speeches',
'inspired': 'Inspired Talks',
'talk': 'Talks',
'lecture': 'Lectures'
}
for key, value in work_type_map.items():
if key in filename_lower or key in text_sample:
doc.metadata['work_type'] = value
break
else:
if 'work_type' not in doc.metadata:
doc.metadata['work_type'] = 'General'
# Detect special content
special_keywords = {
'chicago': 'Chicago Address',
'parliament': 'Parliament Speech',
'colombo': 'Colombo to Almora',
'almora': 'Colombo to Almora'
}
for keyword, special_name in special_keywords.items():
if keyword in text_sample:
doc.metadata['special'] = special_name
break
def load_all_documents(self) -> List[Document]:
"""Load all documents from all configured sources"""
all_docs = []
# Get load configuration
load_cfg = self.config.get('dataset.load', {
'pdf': True,
'text': True,
'markdown': True,
'json': True
})
# Load PDFs
if load_cfg.get('pdf', True):
self.logger.info("\n" + "="*60)
self.logger.info("LOADING PDF FILES")
self.logger.info("="*60)
pdf_docs = self.load_pdf_files()
all_docs.extend(pdf_docs)
# Load text files
if load_cfg.get('text', True):
self.logger.info("\n" + "="*60)
self.logger.info("LOADING TEXT FILES")
self.logger.info("="*60)
text_docs = self.load_text_files()
all_docs.extend(text_docs)
# Load markdown files
if load_cfg.get('markdown', True):
self.logger.info("\n" + "="*60)
self.logger.info("LOADING MARKDOWN FILES")
self.logger.info("="*60)
markdown_docs = self.load_markdown_files()
all_docs.extend(markdown_docs)
# Load JSON dataset
if load_cfg.get('json', True):
self.logger.info("\n" + "="*60)
self.logger.info("LOADING JSON DATASET")
self.logger.info("="*60)
json_docs = self.load_json_dataset()
all_docs.extend(json_docs)
return all_docs
# ============================================================================
# TEXT PROCESSOR
# ============================================================================
class TextProcessor:
"""Process and chunk documents with NLP"""
def __init__(self, utils, nlp_processor):
self.utils = utils
self.config = utils.config
self.logger = utils.logger
self.nlp_processor = nlp_processor
def process_documents(self, documents: List[Document]) -> List[Document]:
"""Process documents with NLP pipeline"""
self.logger.info("Processing documents with NLP...")
processed_docs = []
for idx, doc in enumerate(documents):
try:
# Preprocess text
processed_text = self.nlp_processor.preprocess_text(doc.page_content)
# Create new document with processed text
processed_doc = Document(
page_content=processed_text,
metadata=doc.metadata.copy()
)
processed_docs.append(processed_doc)
except Exception as e:
self.logger.warning(f"Failed to process document {idx}: {e}")
# Keep original if processing fails
processed_docs.append(doc)
self.logger.info(f"โœ… Processed {len(processed_docs)} documents")
return processed_docs
def chunk_documents(self, documents: List[Document]) -> List[Document]:
"""Chunk documents using config-driven RecursiveCharacterTextSplitter"""
self.logger.info("Chunking documents...")
# Get chunking config
chunk_size = self.config.get('embeddings.chunk.size', 500)
chunk_overlap = self.config.get('embeddings.chunk.overlap', 50)
separators = self.config.get('embeddings.chunk.separators',
["\n\n", "\n", ". ", "! ", "? ", "; ", " ", ""])
# Create text splitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=separators,
keep_separator=True
)
# Split documents
chunks = text_splitter.split_documents(documents)
# Add chunk metadata
for idx, chunk in enumerate(chunks):
chunk.metadata['chunk_index'] = idx
chunk.metadata['char_count'] = len(chunk.page_content)
chunk.metadata['word_count'] = len(chunk.page_content.split())
self.logger.info(f"โœ… Created {len(chunks)} chunks")
return chunks
# ============================================================================
# EMBEDDING CREATOR
# ============================================================================
class EmbeddingCreator:
"""Create embeddings and vector store using LangChain"""
def __init__(self, utils):
self.utils = utils
self.config = utils.config
self.logger = utils.logger
self.device = utils.device_manager.device
def create_embeddings_model(self) -> Optional[HuggingFaceEmbeddings]:
"""Create embeddings model from config"""
# Check if embeddings are enabled
if not self.config.get('embeddings.use_hf', True):
self.logger.warning("HuggingFace embeddings disabled in config")
return None
model_name = self.config.get('embeddings.model_name')
normalize = self.config.get('embeddings.normalize', True)
batch_size = self.config.get('embeddings.batch_size', 32)
self.logger.info(f"Creating embeddings model: {model_name}")
self.logger.info(f"Device: {self.device}")
try:
# Create embeddings
embeddings = HuggingFaceEmbeddings(
model_name=model_name,
model_kwargs={'device': self.device},
encode_kwargs={
'batch_size': batch_size,
'normalize_embeddings': normalize
}
)
self.logger.info("โœ… Embeddings model created")
return embeddings
except Exception as e:
self.logger.error(f"Failed to create embeddings model: {e}")
return None
def create_fallback_tfidf_index(self, chunks: List[Document]):
"""Create minimal TF-IDF artifacts for retrieval-only fallback"""
texts = [c.page_content for c in chunks if c.page_content]
root = self.config.get_path('paths', 'vectorstore', 'root')
fallback_dir = root / 'faiss_index'
fallback_dir.mkdir(parents=True, exist_ok=True)
with open(fallback_dir / 'texts.json', 'w', encoding='utf-8') as f:
json.dump(texts, f, ensure_ascii=False)
self.logger.info(f"โœ… Saved TF-IDF fallback texts: {fallback_dir}")
return fallback_dir
def create_vector_store(self, chunks: List[Document], embeddings: HuggingFaceEmbeddings) -> FAISS:
"""Create FAISS vector store from chunks"""
self.logger.info("Creating FAISS vector store...")
try:
# Create vector store
vectorstore = FAISS.from_documents(
documents=chunks,
embedding=embeddings
)
self.logger.info(f"โœ… Vector store created with {vectorstore.index.ntotal:,} vectors")
return vectorstore
except Exception as e:
self.logger.error(f"Failed to create vector store: {e}")
raise
def save_vector_store(self, vectorstore: FAISS):
"""Save vector store to disk"""
save_path = self.config.get_path('paths', 'vectorstore', 'root') / self.config.get('paths.vectorstore.db_name')
save_path.mkdir(parents=True, exist_ok=True)
self.logger.info(f"Saving vector store to: {save_path}")
try:
vectorstore.save_local(str(save_path))
self.logger.info("โœ… Vector store saved successfully")
except Exception as e:
self.logger.error(f"Failed to save vector store: {e}")
raise
# ============================================================================
# VERIFICATION
# ============================================================================
class VectorStoreVerifier:
"""Verify vector store quality"""
def __init__(self, utils):
self.utils = utils
self.config = utils.config
self.logger = utils.logger
def verify(self, vectorstore: FAISS) -> bool:
"""Verify vector store with test queries"""
self.logger.info("\n" + "="*60)
self.logger.info("VERIFICATION")
self.logger.info("="*60)
# Check size
total_vectors = vectorstore.index.ntotal
self.logger.info(f"Total vectors: {total_vectors:,}")
if total_vectors == 0:
self.logger.error("โŒ Vector store is empty!")
return False
# Get test queries from config
test_queries = self.config.get('evaluation.test_queries', [
"What is Karma Yoga?",
"How can I overcome fear?",
"What is the purpose of meditation?"
])
# Test retrieval
top_k = self.config.get('rag.retrieval.top_k', 5)
success = True
for query in test_queries:
self.logger.info(f"\n๐Ÿ” Test query: '{query}'")
try:
results = vectorstore.similarity_search_with_score(query, k=top_k)
if results:
doc, score = results[0]
self.logger.info(f" โœ… Best match score: {score:.4f}")
self.logger.info(f" ๐Ÿ“– Source: {doc.metadata.get('source', 'Unknown')}")
self.logger.info(f" ๐Ÿ“„ Preview: {doc.page_content[:100]}...")
else:
self.logger.warning(f" โš ๏ธ No results found")
success = False
except Exception as e:
self.logger.error(f" โŒ Query failed: {e}")
success = False
if success:
self.logger.info("\nโœ… Verification completed successfully")
else:
self.logger.warning("\nโš ๏ธ Verification completed with warnings")
return success
# ============================================================================
# STATISTICS
# ============================================================================
class DatasetStatistics:
"""Calculate and display dataset statistics"""
def __init__(self, utils):
self.utils = utils
self.logger = utils.logger
def calculate(self, documents: List[Document], chunks: List[Document]):
"""Calculate comprehensive statistics"""
self.logger.info("\n" + "="*70)
self.logger.info("DATASET STATISTICS")
self.logger.info("="*70)
# Document statistics
doc_types = {}
work_types = {}
total_chars = 0
ocr_count = 0
for doc in documents:
doc_type = doc.metadata.get('type', 'unknown')
doc_types[doc_type] = doc_types.get(doc_type, 0) + 1
work_type = doc.metadata.get('work_type', 'unknown')
work_types[work_type] = work_types.get(work_type, 0) + 1
total_chars += len(doc.page_content) if doc.page_content else 0
if doc.metadata.get('ocr', False):
ocr_count += 1
self.logger.info(f"\n๐Ÿ“Š Documents:")
self.logger.info(f" Total: {len(documents)}")
self.logger.info(f" By type:")
for dtype, count in sorted(doc_types.items()):
self.logger.info(f" - {dtype}: {count}")
if ocr_count > 0:
self.logger.info(f" OCR applied: {ocr_count} pages")
self.logger.info(f"\n๐Ÿ“š Work types:")
for wtype, count in sorted(work_types.items()):
self.logger.info(f" - {wtype}: {count}")
self.logger.info(f"\n๐Ÿ“ Content:")
self.logger.info(f" Total characters: {total_chars:,}")
self.logger.info(f" Estimated words: {total_chars//5:,}")
# Chunk statistics
self.logger.info(f"\n๐Ÿงฉ Chunks:")
self.logger.info(f" Total chunks: {len(chunks)}")
if chunks:
chunk_sizes = [len(chunk.page_content) for chunk in chunks if chunk.page_content]
if chunk_sizes:
avg_size = sum(chunk_sizes) / len(chunk_sizes)
self.logger.info(f" Average chunk size: {avg_size:.0f} characters")
self.logger.info(f" Min chunk size: {min(chunk_sizes)}")
self.logger.info(f" Max chunk size: {max(chunk_sizes)}")
# ============================================================================
# MAIN EXECUTION
# ============================================================================
def main():
"""Main execution flow"""
# Initialize utilities
utils = get_utils()
logger = utils.logger
logger.info("="*70)
logger.info("๐Ÿ•‰๏ธ VIVEKANANDA AI - VECTOR DATABASE CREATION")
logger.info("="*70)
try:
# Initialize components
logger.info("\n๐Ÿ”ง Initializing components...")
nlp_processor = create_nlp_processor(utils.config, logger)
document_loader = DocumentLoader(utils)
text_processor = TextProcessor(utils, nlp_processor)
embedding_creator = EmbeddingCreator(utils)
verifier = VectorStoreVerifier(utils)
stats = DatasetStatistics(utils)
# Step 1: Load documents
logger.info("\n" + "="*70)
logger.info("STEP 1: LOADING DOCUMENTS")
logger.info("="*70)
documents = document_loader.load_all_documents()
if not documents:
logger.error("\nโŒ No documents loaded! Please add files to data directory.")
logger.info("\n๐Ÿ“ Expected locations:")
logger.info(f" - PDFs: {utils.config.get_path('paths', 'data', 'raw')}")
logger.info(f" - Text: {utils.config.get_path('paths', 'data', 'extracted')}")
logger.info(f" - Markdown: {utils.config.get_path('paths', 'data', 'markdown')}")
logger.info(f" - JSON: {utils.config.get_path('paths', 'data', 'processed')}/{utils.config.get('dataset.json.file')}")
return 1
logger.info(f"\nโœ… Loaded {len(documents)} total documents")
# Step 2: Process documents
logger.info("\n" + "="*70)
logger.info("STEP 2: NLP PROCESSING")
logger.info("="*70)
processed_docs = text_processor.process_documents(documents)
# Step 3: Chunk documents
logger.info("\n" + "="*70)
logger.info("STEP 3: CHUNKING")
logger.info("="*70)
chunks = text_processor.chunk_documents(processed_docs)
if not chunks:
logger.error("\nโŒ No chunks created!")
return 1
# Show statistics
stats.calculate(documents, chunks)
# Step 4: Create embeddings
logger.info("\n" + "="*70)
logger.info("STEP 4: CREATING EMBEDDINGS")
logger.info("="*70)
embeddings_model = embedding_creator.create_embeddings_model()
if embeddings_model is None:
logger.warning("โš ๏ธ Embeddings model unavailable. Creating TF-IDF fallback artifacts for retrieval.")
embedding_creator.create_fallback_tfidf_index(chunks)
vectorstore = None
else:
vectorstore = embedding_creator.create_vector_store(chunks, embeddings_model)
# Step 5: Save vector store
logger.info("\n" + "="*70)
logger.info("STEP 5: SAVING")
logger.info("="*70)
if vectorstore is not None:
embedding_creator.save_vector_store(vectorstore)
# Step 6: Verify
success = True
if vectorstore is not None:
success = verifier.verify(vectorstore)
# Final summary
logger.info("\n" + "="*70)
if success:
logger.info("โœ… SUCCESS! VECTOR DATABASE READY")
else:
logger.info("โš ๏ธ COMPLETED WITH WARNINGS")
logger.info("="*70)
logger.info(f"\n๐Ÿ“Š Summary:")
logger.info(f" Total documents: {len(documents)}")
logger.info(f" Total chunks: {len(chunks)}")
logger.info(f" Vector store: {utils.config.get_path('paths', 'vectorstore', 'root')} (or TF-IDF fallback texts)")
logger.info(f"\n๐Ÿš€ Next steps:")
logger.info(f" 1. Test retrieval: python scripts/02_query_rag.py")
logger.info(f" 2. Test model: python scripts/03_test_mistral.py")
logger.info(f" 3. Run Streamlit: streamlit run app.py")
return 0 if success else 1
except KeyboardInterrupt:
logger.warning("\nโš ๏ธ Interrupted by user")
return 1
except Exception as e:
logger.error(f"\nโŒ FATAL ERROR: {e}", exc_info=True)
return 1
finally:
# Cleanup
utils.optimize_memory()
if __name__ == "__main__":
sys.exit(main())