Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Main ingestion script for the book ingestion pipeline with forced re-ingestion. | |
| """ | |
| import argparse | |
| import asyncio | |
| import logging | |
| import os | |
| import sys | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Add parent directory to path | |
| sys.path.append(os.path.join(os.path.dirname(__file__), '..')) | |
| from app.ingestion.reader import read_markdown_files, extract_text_from_markdown | |
| from app.ingestion.chunker import chunk_text | |
| from app.ingestion.embedder import generate_embedding | |
| from app.ingestion.storage import store_chunk, upsert_embedding | |
| from app.db.qdrant import QdrantDB # <-- Yeh import add karo | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| async def main(): | |
| parser = argparse.ArgumentParser(description="Book Ingestion Pipeline") | |
| parser.add_argument("--source-dir", required=True, help="Path to markdown files directory") | |
| parser.add_argument("--book-id", required=True, help="Book identifier") | |
| parser.add_argument("--chunk-size", type=int, default=400) | |
| parser.add_argument("--overlap", type=int, default=50) | |
| parser.add_argument("--run-identifier", help="Custom run ID") | |
| parser.add_argument("--dry-run", action="store_true") | |
| parser.add_argument("--force-reingest", action="store_true", # <-- Nayi flag | |
| help="Delete existing collection before ingestion") | |
| args = parser.parse_args() | |
| logger.info(f"Starting ingestion for book: {args.book_id}") | |
| logger.info(f"Source directory: {args.source_dir}") | |
| run_identifier = args.run_identifier or f"{args.book_id}-{datetime.now().strftime('%Y%m%d-%H%M')}" | |
| logger.info(f"Run identifier: {run_identifier}") | |
| # Initialize Qdrant client | |
| qdrant_db = QdrantDB() | |
| collection_name = os.getenv("QDRANT_COLLECTION_NAME", "test-clustor") | |
| # FORCE RE-INGEST: Delete old collection if flag is set | |
| if args.force_reingest: | |
| logger.info(f"Force re-ingest enabled: Deleting existing collection '{collection_name}'...") | |
| try: | |
| await qdrant_db.delete_collection(collection_name) | |
| logger.info(f"Successfully deleted collection '{collection_name}'") | |
| except Exception as e: | |
| logger.warning(f"Could not delete collection (might not exist): {e}") | |
| # Recreate collection (optional but recommended) | |
| try: | |
| await qdrant_db.create_collection(collection_name) | |
| logger.info(f"Collection '{collection_name}' ready for ingestion") | |
| except Exception as e: | |
| logger.info(f"Collection already exists or error creating: {e}") | |
| if args.dry_run: | |
| logger.info("DRY RUN MODE ENABLED") | |
| total_files = processed_files = total_chunks = chunks_stored = embeddings_stored = 0 | |
| try: | |
| markdown_files = read_markdown_files(args.source_dir) | |
| total_files = len(markdown_files) | |
| logger.info(f"Found {total_files} markdown files") | |
| for i, md_file in enumerate(markdown_files): | |
| logger.info(f"Processing {i+1}/{total_files}: {md_file['file_path']}") | |
| text_content = extract_text_from_markdown(md_file['content']) | |
| chunks = chunk_text( | |
| text=text_content, | |
| source_file=md_file['file_path'], | |
| chunk_size=args.chunk_size, | |
| overlap=args.overlap | |
| ) | |
| for j, chunk in enumerate(chunks): | |
| if args.dry_run: | |
| continue | |
| try: | |
| # Store in Neon | |
| chunk_id = await store_chunk({ | |
| **chunk, | |
| 'book_id': args.book_id, | |
| 'chapter': extract_chapter_from_path(md_file['file_path']), | |
| 'section': extract_section_from_content(text_content, j) | |
| }) | |
| if chunk_id: | |
| chunks_stored += 1 | |
| # Generate embedding | |
| embedding = await generate_embedding(chunk['content']) | |
| # PAYLOAD MEIN CONTENT SABSE UPAR AUR ZAROORI | |
| payload = { | |
| "content": chunk['content'], # <-- Yeh first rakho | |
| "chunk_id": chunk_id, | |
| "chunk_hash": chunk['chunk_hash'], | |
| "book_id": args.book_id, | |
| "chapter": extract_chapter_from_path(md_file['file_path']), | |
| "section": extract_section_from_content(text_content, j), | |
| "source_file": chunk['source_file'], | |
| "chunk_index": chunk['chunk_index'] | |
| } | |
| # Upsert in Qdrant | |
| success = await upsert_embedding(embedding, payload) | |
| if success: | |
| embeddings_stored += 1 | |
| total_chunks += 1 | |
| except Exception as e: | |
| logger.error(f"Error processing chunk: {e}") | |
| continue | |
| processed_files += 1 | |
| logger.info("=== INGESTION SUMMARY ===") | |
| logger.info(f"Files processed: {processed_files}/{total_files}") | |
| logger.info(f"Total chunks: {total_chunks}") | |
| logger.info(f"Chunks stored: {chunks_stored}") | |
| logger.info(f"Embeddings stored: {embeddings_stored}") | |
| logger.info("INGESTION COMPLETE WITH FRESH DATA!") | |
| except Exception as e: | |
| logger.error(f"Ingestion failed: {e}") | |
| return False | |
| return True | |
| def extract_chapter_from_path(file_path: str) -> str: | |
| import os.path | |
| directory = os.path.dirname(file_path) | |
| return os.path.basename(directory) if directory else "root" | |
| def extract_section_from_content(content: str, chunk_index: int) -> str: | |
| return f"Section_{chunk_index}" | |
| if __name__ == "__main__": | |
| success = asyncio.run(main()) | |
| sys.exit(0 if success else 1) |