bfh-studadmin-assist / scripts /ingest_documents.py
awellis's picture
Implement modular RAG email assistant architecture
5df4a2a
#!/usr/bin/env python3
"""Script to ingest and index markdown documents."""
import sys
import logging
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.config import get_config
from src.document_processing.loader import MarkdownDocumentLoader
from src.document_processing.chunker import SemanticChunker
from src.indexing.opensearch_client import OpenSearchClient
from src.indexing.indexer import DocumentIndexer
def setup_logging():
"""Configure logging."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
def main():
"""Main ingestion workflow."""
setup_logging()
logger = logging.getLogger(__name__)
logger.info("Starting document ingestion process...")
# Load configuration
config = get_config()
logger.info(f"Using documents path: {config.document_processing.documents_path}")
logger.info(f"Target index: {config.opensearch.index_name}")
# Initialize OpenSearch client
logger.info("Connecting to OpenSearch...")
os_client = OpenSearchClient(config.opensearch)
if not os_client.ping():
logger.error("Failed to connect to OpenSearch. Please check your configuration.")
sys.exit(1)
logger.info("Successfully connected to OpenSearch")
# Create or recreate index
logger.info("Setting up index...")
if os_client.index_exists():
logger.warning(f"Index '{config.opensearch.index_name}' already exists")
response = input("Do you want to delete and recreate it? (yes/no): ")
if response.lower() in ["yes", "y"]:
logger.info("Deleting existing index...")
os_client.delete_index()
os_client.create_index(embedding_dim=1536)
else:
logger.info("Using existing index")
else:
os_client.create_index(embedding_dim=1536)
# Load documents
logger.info("Loading markdown documents...")
loader = MarkdownDocumentLoader(config.document_processing.documents_path)
documents = loader.load_documents()
if not documents:
logger.error("No documents loaded. Exiting.")
sys.exit(1)
logger.info(f"Loaded {len(documents)} documents")
# Chunk documents
logger.info("Chunking documents...")
chunker = SemanticChunker(
chunk_size=config.document_processing.chunk_size,
chunk_overlap=config.document_processing.chunk_overlap,
min_chunk_size=config.document_processing.min_chunk_size,
)
chunked_documents = chunker.chunk_documents(documents)
logger.info(f"Created {len(chunked_documents)} chunks")
# Index documents
logger.info("Indexing documents in OpenSearch...")
indexer = DocumentIndexer(
opensearch_config=config.opensearch,
llm_config=config.llm,
)
indexed_count = indexer.index_documents(chunked_documents)
logger.info(f"Successfully indexed {indexed_count} document chunks")
# Verify
final_count = indexer.get_document_count()
logger.info(f"Total documents in index: {final_count}")
logger.info("✅ Document ingestion completed successfully!")
if __name__ == "__main__":
main()