Rag_chatbot / scripts /ingest_book.py
suhail
new changes
5391a0a
#!/usr/bin/env python3
"""
Main ingestion script for the book ingestion pipeline with forced re-ingestion.
"""
import argparse
import asyncio
import logging
import os
import sys
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
# Add parent directory to path
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from app.ingestion.reader import read_markdown_files, extract_text_from_markdown
from app.ingestion.chunker import chunk_text
from app.ingestion.embedder import generate_embedding
from app.ingestion.storage import store_chunk, upsert_embedding
from app.db.qdrant import QdrantDB # <-- Yeh import add karo
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def main():
parser = argparse.ArgumentParser(description="Book Ingestion Pipeline")
parser.add_argument("--source-dir", required=True, help="Path to markdown files directory")
parser.add_argument("--book-id", required=True, help="Book identifier")
parser.add_argument("--chunk-size", type=int, default=400)
parser.add_argument("--overlap", type=int, default=50)
parser.add_argument("--run-identifier", help="Custom run ID")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--force-reingest", action="store_true", # <-- Nayi flag
help="Delete existing collection before ingestion")
args = parser.parse_args()
logger.info(f"Starting ingestion for book: {args.book_id}")
logger.info(f"Source directory: {args.source_dir}")
run_identifier = args.run_identifier or f"{args.book_id}-{datetime.now().strftime('%Y%m%d-%H%M')}"
logger.info(f"Run identifier: {run_identifier}")
# Initialize Qdrant client
qdrant_db = QdrantDB()
collection_name = os.getenv("QDRANT_COLLECTION_NAME", "test-clustor")
# FORCE RE-INGEST: Delete old collection if flag is set
if args.force_reingest:
logger.info(f"Force re-ingest enabled: Deleting existing collection '{collection_name}'...")
try:
await qdrant_db.delete_collection(collection_name)
logger.info(f"Successfully deleted collection '{collection_name}'")
except Exception as e:
logger.warning(f"Could not delete collection (might not exist): {e}")
# Recreate collection (optional but recommended)
try:
await qdrant_db.create_collection(collection_name)
logger.info(f"Collection '{collection_name}' ready for ingestion")
except Exception as e:
logger.info(f"Collection already exists or error creating: {e}")
if args.dry_run:
logger.info("DRY RUN MODE ENABLED")
total_files = processed_files = total_chunks = chunks_stored = embeddings_stored = 0
try:
markdown_files = read_markdown_files(args.source_dir)
total_files = len(markdown_files)
logger.info(f"Found {total_files} markdown files")
for i, md_file in enumerate(markdown_files):
logger.info(f"Processing {i+1}/{total_files}: {md_file['file_path']}")
text_content = extract_text_from_markdown(md_file['content'])
chunks = chunk_text(
text=text_content,
source_file=md_file['file_path'],
chunk_size=args.chunk_size,
overlap=args.overlap
)
for j, chunk in enumerate(chunks):
if args.dry_run:
continue
try:
# Store in Neon
chunk_id = await store_chunk({
**chunk,
'book_id': args.book_id,
'chapter': extract_chapter_from_path(md_file['file_path']),
'section': extract_section_from_content(text_content, j)
})
if chunk_id:
chunks_stored += 1
# Generate embedding
embedding = await generate_embedding(chunk['content'])
# PAYLOAD MEIN CONTENT SABSE UPAR AUR ZAROORI
payload = {
"content": chunk['content'], # <-- Yeh first rakho
"chunk_id": chunk_id,
"chunk_hash": chunk['chunk_hash'],
"book_id": args.book_id,
"chapter": extract_chapter_from_path(md_file['file_path']),
"section": extract_section_from_content(text_content, j),
"source_file": chunk['source_file'],
"chunk_index": chunk['chunk_index']
}
# Upsert in Qdrant
success = await upsert_embedding(embedding, payload)
if success:
embeddings_stored += 1
total_chunks += 1
except Exception as e:
logger.error(f"Error processing chunk: {e}")
continue
processed_files += 1
logger.info("=== INGESTION SUMMARY ===")
logger.info(f"Files processed: {processed_files}/{total_files}")
logger.info(f"Total chunks: {total_chunks}")
logger.info(f"Chunks stored: {chunks_stored}")
logger.info(f"Embeddings stored: {embeddings_stored}")
logger.info("INGESTION COMPLETE WITH FRESH DATA!")
except Exception as e:
logger.error(f"Ingestion failed: {e}")
return False
return True
def extract_chapter_from_path(file_path: str) -> str:
import os.path
directory = os.path.dirname(file_path)
return os.path.basename(directory) if directory else "root"
def extract_section_from_content(content: str, chunk_index: int) -> str:
return f"Section_{chunk_index}"
if __name__ == "__main__":
success = asyncio.run(main())
sys.exit(0 if success else 1)