File size: 6,243 Bytes
36102cc
 
647e1bd
36102cc
 
 
 
 
 
 
 
 
 
647e1bd
 
36102cc
 
 
647e1bd
36102cc
647e1bd
 
36102cc
 
 
 
 
 
 
 
 
 
647e1bd
 
 
 
 
 
 
 
36102cc
 
 
 
 
 
647e1bd
36102cc
 
647e1bd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
36102cc
 
 
 
 
 
 
647e1bd
36102cc
 
 
 
 
 
 
 
 
 
 
 
 
 
647e1bd
36102cc
 
 
647e1bd
 
36102cc
 
 
 
 
647e1bd
36102cc
 
647e1bd
36102cc
647e1bd
 
 
 
 
 
 
 
 
36102cc
647e1bd
36102cc
 
 
 
 
 
 
647e1bd
36102cc
 
 
 
 
 
647e1bd
 
 
 
36102cc
 
 
 
647e1bd
 
36102cc
 
 
 
 
647e1bd
36102cc
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/env python3
"""
Main ingestion script for the book ingestion pipeline with forced re-ingestion.
"""

import argparse
import asyncio
import logging
import os
import sys
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()

# Add parent directory to path
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))

from app.ingestion.reader import read_markdown_files, extract_text_from_markdown
from app.ingestion.chunker import chunk_text
from app.ingestion.embedder import generate_embedding
from app.ingestion.storage import store_chunk, upsert_embedding
from app.db.qdrant import QdrantDB  # <-- Yeh import add karo

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


async def main():
    parser = argparse.ArgumentParser(description="Book Ingestion Pipeline")
    parser.add_argument("--source-dir", required=True, help="Path to markdown files directory")
    parser.add_argument("--book-id", required=True, help="Book identifier")
    parser.add_argument("--chunk-size", type=int, default=400)
    parser.add_argument("--overlap", type=int, default=50)
    parser.add_argument("--run-identifier", help="Custom run ID")
    parser.add_argument("--dry-run", action="store_true")
    parser.add_argument("--force-reingest", action="store_true",  # <-- Nayi flag
                        help="Delete existing collection before ingestion")
    
    args = parser.parse_args()
    
    logger.info(f"Starting ingestion for book: {args.book_id}")
    logger.info(f"Source directory: {args.source_dir}")
    
    run_identifier = args.run_identifier or f"{args.book_id}-{datetime.now().strftime('%Y%m%d-%H%M')}"
    logger.info(f"Run identifier: {run_identifier}")
    
    # Initialize Qdrant client
    qdrant_db = QdrantDB()
    collection_name = os.getenv("QDRANT_COLLECTION_NAME", "test-clustor")
    
    # FORCE RE-INGEST: Delete old collection if flag is set
    if args.force_reingest:
        logger.info(f"Force re-ingest enabled: Deleting existing collection '{collection_name}'...")
        try:
            await qdrant_db.delete_collection(collection_name)
            logger.info(f"Successfully deleted collection '{collection_name}'")
        except Exception as e:
            logger.warning(f"Could not delete collection (might not exist): {e}")
    
    # Recreate collection (optional but recommended)
    try:
        await qdrant_db.create_collection(collection_name)
        logger.info(f"Collection '{collection_name}' ready for ingestion")
    except Exception as e:
        logger.info(f"Collection already exists or error creating: {e}")
    
    if args.dry_run:
        logger.info("DRY RUN MODE ENABLED")
    
    total_files = processed_files = total_chunks = chunks_stored = embeddings_stored = 0
    
    try:
        markdown_files = read_markdown_files(args.source_dir)
        total_files = len(markdown_files)
        logger.info(f"Found {total_files} markdown files")
        
        for i, md_file in enumerate(markdown_files):
            logger.info(f"Processing {i+1}/{total_files}: {md_file['file_path']}")
            
            text_content = extract_text_from_markdown(md_file['content'])
            chunks = chunk_text(
                text=text_content,
                source_file=md_file['file_path'],
                chunk_size=args.chunk_size,
                overlap=args.overlap
            )
            
            for j, chunk in enumerate(chunks):
                if args.dry_run:
                    continue
                
                try:
                    # Store in Neon
                    chunk_id = await store_chunk({
                        **chunk,
                        'book_id': args.book_id,
                        'chapter': extract_chapter_from_path(md_file['file_path']),
                        'section': extract_section_from_content(text_content, j)
                    })
                    
                    if chunk_id:
                        chunks_stored += 1
                    
                    # Generate embedding
                    embedding = await generate_embedding(chunk['content'])
                    
                    # PAYLOAD MEIN CONTENT SABSE UPAR AUR ZAROORI
                    payload = {
                        "content": chunk['content'],  # <-- Yeh first rakho
                        "chunk_id": chunk_id,
                        "chunk_hash": chunk['chunk_hash'],
                        "book_id": args.book_id,
                        "chapter": extract_chapter_from_path(md_file['file_path']),
                        "section": extract_section_from_content(text_content, j),
                        "source_file": chunk['source_file'],
                        "chunk_index": chunk['chunk_index']
                    }
                    
                    # Upsert in Qdrant
                    success = await upsert_embedding(embedding, payload)
                    if success:
                        embeddings_stored += 1
                    
                    total_chunks += 1
                    
                except Exception as e:
                    logger.error(f"Error processing chunk: {e}")
                    continue
            
            processed_files += 1
        
        logger.info("=== INGESTION SUMMARY ===")
        logger.info(f"Files processed: {processed_files}/{total_files}")
        logger.info(f"Total chunks: {total_chunks}")
        logger.info(f"Chunks stored: {chunks_stored}")
        logger.info(f"Embeddings stored: {embeddings_stored}")
        logger.info("INGESTION COMPLETE WITH FRESH DATA!")
        
    except Exception as e:
        logger.error(f"Ingestion failed: {e}")
        return False
    
    return True


def extract_chapter_from_path(file_path: str) -> str:
    import os.path
    directory = os.path.dirname(file_path)
    return os.path.basename(directory) if directory else "root"


def extract_section_from_content(content: str, chunk_index: int) -> str:
    return f"Section_{chunk_index}"


if __name__ == "__main__":
    success = asyncio.run(main())
    sys.exit(0 if success else 1)