Tahasaif3 commited on
Commit
e62ef42
·
verified ·
1 Parent(s): 456d4ee

Update app/services/document_ingestion.py

Browse files
Files changed (1) hide show
  1. app/services/document_ingestion.py +38 -14
app/services/document_ingestion.py CHANGED
@@ -1,9 +1,10 @@
1
  import os
 
2
  from typing import List, Dict
3
  from app.utils.text_processing import extract_chapters_and_sections, split_text_into_chunks, clean_markdown
4
  from app.services.rag_service import rag_pipeline
5
 
6
- def ingest_book_content(file_path: str) -> List[str]:
7
  """
8
  Ingest the book content from a markdown file into the vector store
9
 
@@ -20,7 +21,8 @@ def ingest_book_content(file_path: str) -> List[str]:
20
  # Extract chapters and sections
21
  sections = extract_chapters_and_sections(content)
22
 
23
- ingested_document_ids = []
 
24
 
25
  # Process each section
26
  for section in sections:
@@ -34,7 +36,7 @@ def ingest_book_content(file_path: str) -> List[str]:
34
  # Split into chunks if the content is too long
35
  chunks = split_text_into_chunks(clean_content, chunk_size=800, overlap=100)
36
 
37
- # Ingest each chunk
38
  for i, chunk in enumerate(chunks):
39
  document = {
40
  "title": section["title"] + (f" (part {i+1})" if len(chunks) > 1 else ""),
@@ -43,31 +45,53 @@ def ingest_book_content(file_path: str) -> List[str]:
43
  "section": section["section"],
44
  "subsection": section["subsection"]
45
  }
46
-
47
- # Ingest the document
48
- doc_id = rag_pipeline.ingest_document(document)
49
- if doc_id:
50
- ingested_document_ids.append(doc_id)
51
 
52
  return ingested_document_ids
53
 
54
- def initialize_knowledge_base():
55
  """
56
- Initialize the knowledge base by ingesting the book content
57
  """
58
  # Define the path to the book knowledge base
59
- book_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
60
- "book_knowledge_base.md")
 
 
61
 
62
  if os.path.exists(book_path):
63
  print("Ingesting book content into the knowledge base...")
64
- document_ids = ingest_book_content(book_path)
65
  print(f"Successfully ingested {len(document_ids)} documents into the knowledge base.")
66
  return document_ids
67
  else:
68
  print(f"Book file not found at {book_path}")
69
  return []
70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
  if __name__ == "__main__":
72
  # Initialize the knowledge base when the script is run directly
73
- initialize_knowledge_base()
 
1
  import os
2
+ import asyncio
3
  from typing import List, Dict
4
  from app.utils.text_processing import extract_chapters_and_sections, split_text_into_chunks, clean_markdown
5
  from app.services.rag_service import rag_pipeline
6
 
7
+ async def ingest_book_content(file_path: str) -> List[str]:
8
  """
9
  Ingest the book content from a markdown file into the vector store
10
 
 
21
  # Extract chapters and sections
22
  sections = extract_chapters_and_sections(content)
23
 
24
+ # Prepare all documents first
25
+ documents_to_ingest = []
26
 
27
  # Process each section
28
  for section in sections:
 
36
  # Split into chunks if the content is too long
37
  chunks = split_text_into_chunks(clean_content, chunk_size=800, overlap=100)
38
 
39
+ # Prepare each chunk as a document
40
  for i, chunk in enumerate(chunks):
41
  document = {
42
  "title": section["title"] + (f" (part {i+1})" if len(chunks) > 1 else ""),
 
45
  "section": section["section"],
46
  "subsection": section["subsection"]
47
  }
48
+ documents_to_ingest.append(document)
49
+
50
+ # Ingest all documents using batch processing for better performance
51
+ print(f"Prepared {len(documents_to_ingest)} document chunks for ingestion...")
52
+ ingested_document_ids = await rag_pipeline.ingest_documents_batch(documents_to_ingest)
53
 
54
  return ingested_document_ids
55
 
56
+ async def initialize_knowledge_base_async():
57
  """
58
+ Async function to initialize the knowledge base by ingesting the book content
59
  """
60
  # Define the path to the book knowledge base
61
+ book_path = os.path.join(
62
+ os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
63
+ "book_knowledge_base.md"
64
+ )
65
 
66
  if os.path.exists(book_path):
67
  print("Ingesting book content into the knowledge base...")
68
+ document_ids = await ingest_book_content(book_path)
69
  print(f"Successfully ingested {len(document_ids)} documents into the knowledge base.")
70
  return document_ids
71
  else:
72
  print(f"Book file not found at {book_path}")
73
  return []
74
 
75
+ def initialize_knowledge_base():
76
+ """
77
+ Synchronous wrapper to initialize the knowledge base
78
+ Can be called from non-async contexts
79
+ """
80
+ try:
81
+ # Check if an event loop is already running
82
+ try:
83
+ loop = asyncio.get_running_loop()
84
+ # If we're in an async context, return a task
85
+ return asyncio.create_task(initialize_knowledge_base_async())
86
+ except RuntimeError:
87
+ # No event loop running, create one and run
88
+ return asyncio.run(initialize_knowledge_base_async())
89
+ except Exception as e:
90
+ print(f"Error initializing knowledge base: {e}")
91
+ import traceback
92
+ traceback.print_exc()
93
+ return []
94
+
95
  if __name__ == "__main__":
96
  # Initialize the knowledge base when the script is run directly
97
+ asyncio.run(initialize_knowledge_base_async())