Spaces:
Sleeping
Sleeping
File size: 6,243 Bytes
36102cc 647e1bd 36102cc 647e1bd 36102cc 647e1bd 36102cc 647e1bd 36102cc 647e1bd 36102cc 647e1bd 36102cc 647e1bd 36102cc 647e1bd 36102cc 647e1bd 36102cc 647e1bd 36102cc 647e1bd 36102cc 647e1bd 36102cc 647e1bd 36102cc 647e1bd 36102cc 647e1bd 36102cc 647e1bd 36102cc 647e1bd 36102cc 647e1bd 36102cc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
#!/usr/bin/env python3
"""
Main ingestion script for the book ingestion pipeline with forced re-ingestion.
"""
import argparse
import asyncio
import logging
import os
import sys
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
# Add parent directory to path
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from app.ingestion.reader import read_markdown_files, extract_text_from_markdown
from app.ingestion.chunker import chunk_text
from app.ingestion.embedder import generate_embedding
from app.ingestion.storage import store_chunk, upsert_embedding
from app.db.qdrant import QdrantDB # <-- Yeh import add karo
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def main():
parser = argparse.ArgumentParser(description="Book Ingestion Pipeline")
parser.add_argument("--source-dir", required=True, help="Path to markdown files directory")
parser.add_argument("--book-id", required=True, help="Book identifier")
parser.add_argument("--chunk-size", type=int, default=400)
parser.add_argument("--overlap", type=int, default=50)
parser.add_argument("--run-identifier", help="Custom run ID")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--force-reingest", action="store_true", # <-- Nayi flag
help="Delete existing collection before ingestion")
args = parser.parse_args()
logger.info(f"Starting ingestion for book: {args.book_id}")
logger.info(f"Source directory: {args.source_dir}")
run_identifier = args.run_identifier or f"{args.book_id}-{datetime.now().strftime('%Y%m%d-%H%M')}"
logger.info(f"Run identifier: {run_identifier}")
# Initialize Qdrant client
qdrant_db = QdrantDB()
collection_name = os.getenv("QDRANT_COLLECTION_NAME", "test-clustor")
# FORCE RE-INGEST: Delete old collection if flag is set
if args.force_reingest:
logger.info(f"Force re-ingest enabled: Deleting existing collection '{collection_name}'...")
try:
await qdrant_db.delete_collection(collection_name)
logger.info(f"Successfully deleted collection '{collection_name}'")
except Exception as e:
logger.warning(f"Could not delete collection (might not exist): {e}")
# Recreate collection (optional but recommended)
try:
await qdrant_db.create_collection(collection_name)
logger.info(f"Collection '{collection_name}' ready for ingestion")
except Exception as e:
logger.info(f"Collection already exists or error creating: {e}")
if args.dry_run:
logger.info("DRY RUN MODE ENABLED")
total_files = processed_files = total_chunks = chunks_stored = embeddings_stored = 0
try:
markdown_files = read_markdown_files(args.source_dir)
total_files = len(markdown_files)
logger.info(f"Found {total_files} markdown files")
for i, md_file in enumerate(markdown_files):
logger.info(f"Processing {i+1}/{total_files}: {md_file['file_path']}")
text_content = extract_text_from_markdown(md_file['content'])
chunks = chunk_text(
text=text_content,
source_file=md_file['file_path'],
chunk_size=args.chunk_size,
overlap=args.overlap
)
for j, chunk in enumerate(chunks):
if args.dry_run:
continue
try:
# Store in Neon
chunk_id = await store_chunk({
**chunk,
'book_id': args.book_id,
'chapter': extract_chapter_from_path(md_file['file_path']),
'section': extract_section_from_content(text_content, j)
})
if chunk_id:
chunks_stored += 1
# Generate embedding
embedding = await generate_embedding(chunk['content'])
# PAYLOAD MEIN CONTENT SABSE UPAR AUR ZAROORI
payload = {
"content": chunk['content'], # <-- Yeh first rakho
"chunk_id": chunk_id,
"chunk_hash": chunk['chunk_hash'],
"book_id": args.book_id,
"chapter": extract_chapter_from_path(md_file['file_path']),
"section": extract_section_from_content(text_content, j),
"source_file": chunk['source_file'],
"chunk_index": chunk['chunk_index']
}
# Upsert in Qdrant
success = await upsert_embedding(embedding, payload)
if success:
embeddings_stored += 1
total_chunks += 1
except Exception as e:
logger.error(f"Error processing chunk: {e}")
continue
processed_files += 1
logger.info("=== INGESTION SUMMARY ===")
logger.info(f"Files processed: {processed_files}/{total_files}")
logger.info(f"Total chunks: {total_chunks}")
logger.info(f"Chunks stored: {chunks_stored}")
logger.info(f"Embeddings stored: {embeddings_stored}")
logger.info("INGESTION COMPLETE WITH FRESH DATA!")
except Exception as e:
logger.error(f"Ingestion failed: {e}")
return False
return True
def extract_chapter_from_path(file_path: str) -> str:
import os.path
directory = os.path.dirname(file_path)
return os.path.basename(directory) if directory else "root"
def extract_section_from_content(content: str, chunk_index: int) -> str:
return f"Section_{chunk_index}"
if __name__ == "__main__":
success = asyncio.run(main())
sys.exit(0 if success else 1) |