AsyncRAG / ingest.py
Zubaish
update
2194516
import os
import shutil
from huggingface_hub import hf_hub_download, list_repo_files
from langchain_community.document_loaders import Docx2txtLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from config import KB_DIR, HF_DATASET_REPO, EMBEDDING_MODEL, CHROMA_DIR, CHUNK_SIZE, CHUNK_OVERLAP, HF_TOKEN
def run_ingestion():
# 1. Environment Cleanup & Setup
# Using absolute paths from config (e.g., /app/kb and /app/chroma_db)
if os.path.exists(KB_DIR):
shutil.rmtree(KB_DIR)
if os.path.exists(CHROMA_DIR):
shutil.rmtree(CHROMA_DIR)
os.makedirs(KB_DIR, exist_ok=True)
os.makedirs(CHROMA_DIR, exist_ok=True)
print(f"⬇️ Listing files in repository: {HF_DATASET_REPO}...")
try:
# 2. Direct File Download (Bypassing load_dataset to avoid PDF errors)
# This only fetches .docx files to keep your Gandhi ji knowledge base clean
all_files = list_repo_files(repo_id=HF_DATASET_REPO, repo_type="dataset", token=HF_TOKEN)
docx_files = [f for f in all_files if f.lower().endswith(".docx")]
if not docx_files:
print("❌ Error: No .docx files found in the dataset repository.")
return
docs = []
for file_name in docx_files:
print(f"πŸ“‚ Downloading {file_name}...")
# Download to HF cache first
temp_path = hf_hub_download(
repo_id=HF_DATASET_REPO,
filename=file_name,
repo_type="dataset",
token=HF_TOKEN
)
# Copy to our predictable /app/kb directory
local_docx = os.path.join(KB_DIR, os.path.basename(file_name))
shutil.copy(temp_path, local_docx)
# 3. Load text from Docx (ignores images automatically)
loader = Docx2txtLoader(local_docx)
docs.extend(loader.load())
print(f"βœ… Text extracted from: {file_name}")
if not docs:
print("❌ Error: Extracted document list is empty.")
return
# 4. Text Splitting (Optimized for RAG context windows)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP,
add_start_index=True
)
splits = text_splitter.split_documents(docs)
print(f"βœ‚οΈ Split into {len(splits)} text chunks.")
# 5. Embedding & Vector Store Creation
print(f"🧠 Generating embeddings with {EMBEDDING_MODEL}...")
embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL)
# Save to the persistent directory specified in config (/app/chroma_db)
print(f"πŸ’Ύ Saving Vector Database to {CHROMA_DIR}...")
Chroma.from_documents(
documents=splits,
embedding=embeddings,
persist_directory=CHROMA_DIR
)
print(f"✨ Knowledge base fully initialized and saved.")
except Exception as e:
print(f"❌ CRITICAL INGESTION ERROR: {str(e)}")
if __name__ == "__main__":
run_ingestion()