chatbot / app /services /document_ingestion.py
Tahasaif3's picture
Update app/services/document_ingestion.py
e62ef42 verified
import os
import asyncio
from typing import List, Dict
from app.utils.text_processing import extract_chapters_and_sections, split_text_into_chunks, clean_markdown
from app.services.rag_service import rag_pipeline
async def ingest_book_content(file_path: str) -> List[str]:
"""
Ingest the book content from a markdown file into the vector store
Args:
file_path: Path to the markdown file containing the book content
Returns:
List of document IDs that were ingested
"""
# Read the book content
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
# Extract chapters and sections
sections = extract_chapters_and_sections(content)
# Prepare all documents first
documents_to_ingest = []
# Process each section
for section in sections:
# Clean the content
clean_content = clean_markdown(section["content"])
# Skip empty sections
if not clean_content.strip():
continue
# Split into chunks if the content is too long
chunks = split_text_into_chunks(clean_content, chunk_size=800, overlap=100)
# Prepare each chunk as a document
for i, chunk in enumerate(chunks):
document = {
"title": section["title"] + (f" (part {i+1})" if len(chunks) > 1 else ""),
"content": chunk,
"chapter": section["chapter"],
"section": section["section"],
"subsection": section["subsection"]
}
documents_to_ingest.append(document)
# Ingest all documents using batch processing for better performance
print(f"Prepared {len(documents_to_ingest)} document chunks for ingestion...")
ingested_document_ids = await rag_pipeline.ingest_documents_batch(documents_to_ingest)
return ingested_document_ids
async def initialize_knowledge_base_async():
"""
Async function to initialize the knowledge base by ingesting the book content
"""
# Define the path to the book knowledge base
book_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"book_knowledge_base.md"
)
if os.path.exists(book_path):
print("Ingesting book content into the knowledge base...")
document_ids = await ingest_book_content(book_path)
print(f"Successfully ingested {len(document_ids)} documents into the knowledge base.")
return document_ids
else:
print(f"Book file not found at {book_path}")
return []
def initialize_knowledge_base():
"""
Synchronous wrapper to initialize the knowledge base
Can be called from non-async contexts
"""
try:
# Check if an event loop is already running
try:
loop = asyncio.get_running_loop()
# If we're in an async context, return a task
return asyncio.create_task(initialize_knowledge_base_async())
except RuntimeError:
# No event loop running, create one and run
return asyncio.run(initialize_knowledge_base_async())
except Exception as e:
print(f"Error initializing knowledge base: {e}")
import traceback
traceback.print_exc()
return []
if __name__ == "__main__":
# Initialize the knowledge base when the script is run directly
asyncio.run(initialize_knowledge_base_async())