| import os |
| import asyncio |
| from typing import List, Dict |
| from app.utils.text_processing import extract_chapters_and_sections, split_text_into_chunks, clean_markdown |
| from app.services.rag_service import rag_pipeline |
|
|
| async def ingest_book_content(file_path: str) -> List[str]: |
| """ |
| Ingest the book content from a markdown file into the vector store |
| |
| Args: |
| file_path: Path to the markdown file containing the book content |
| |
| Returns: |
| List of document IDs that were ingested |
| """ |
| |
| with open(file_path, 'r', encoding='utf-8') as file: |
| content = file.read() |
| |
| |
| sections = extract_chapters_and_sections(content) |
| |
| |
| documents_to_ingest = [] |
| |
| |
| for section in sections: |
| |
| clean_content = clean_markdown(section["content"]) |
| |
| |
| if not clean_content.strip(): |
| continue |
| |
| |
| chunks = split_text_into_chunks(clean_content, chunk_size=800, overlap=100) |
| |
| |
| for i, chunk in enumerate(chunks): |
| document = { |
| "title": section["title"] + (f" (part {i+1})" if len(chunks) > 1 else ""), |
| "content": chunk, |
| "chapter": section["chapter"], |
| "section": section["section"], |
| "subsection": section["subsection"] |
| } |
| documents_to_ingest.append(document) |
| |
| |
| print(f"Prepared {len(documents_to_ingest)} document chunks for ingestion...") |
| ingested_document_ids = await rag_pipeline.ingest_documents_batch(documents_to_ingest) |
| |
| return ingested_document_ids |
|
|
| async def initialize_knowledge_base_async(): |
| """ |
| Async function to initialize the knowledge base by ingesting the book content |
| """ |
| |
| book_path = os.path.join( |
| os.path.dirname(os.path.dirname(os.path.dirname(__file__))), |
| "book_knowledge_base.md" |
| ) |
| |
| if os.path.exists(book_path): |
| print("Ingesting book content into the knowledge base...") |
| document_ids = await ingest_book_content(book_path) |
| print(f"Successfully ingested {len(document_ids)} documents into the knowledge base.") |
| return document_ids |
| else: |
| print(f"Book file not found at {book_path}") |
| return [] |
|
|
| def initialize_knowledge_base(): |
| """ |
| Synchronous wrapper to initialize the knowledge base |
| Can be called from non-async contexts |
| """ |
| try: |
| |
| try: |
| loop = asyncio.get_running_loop() |
| |
| return asyncio.create_task(initialize_knowledge_base_async()) |
| except RuntimeError: |
| |
| return asyncio.run(initialize_knowledge_base_async()) |
| except Exception as e: |
| print(f"Error initializing knowledge base: {e}") |
| import traceback |
| traceback.print_exc() |
| return [] |
|
|
| if __name__ == "__main__": |
| |
| asyncio.run(initialize_knowledge_base_async()) |