DenysKovalML commited on
Commit
5129eef
·
1 Parent(s): 207c4af

Fix: Remove qdrant_service parameter from index_chunks_to_qdrant

Browse files

Now uses module-level singleton throughout all initialization code

demo/main.py CHANGED
@@ -13,7 +13,6 @@ if os.getenv("SPACE_ID"): # Detect HF Spaces environment
13
 
14
  sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
15
 
16
- from scientific_rag.application.rag.pipeline import RAGPipeline
17
  from scientific_rag.scripts.index_qdrant import index_qdrant
18
 
19
 
@@ -28,15 +27,22 @@ def initialize_qdrant_on_hf_spaces():
28
  raise FileNotFoundError(f"Required file not found: {chunks_file}")
29
 
30
  logger.info("Initializing Qdrant with chunks from local script...")
31
- index_qdrant(
32
- chunks_file=chunks_file,
33
- embedding_batch_size=32,
34
- upload_batch_size=100,
35
- create_collection=True,
36
- process_batch_size=10000,
37
- )
38
- logger.info("✅ Qdrant initialization complete!")
 
 
 
 
 
39
 
 
 
40
 
41
  MAIN_HEADER = """
42
  <div style="text-align: center; margin-bottom: 40px;">
@@ -217,7 +223,7 @@ try:
217
 
218
  rag_pipeline = RAGPipelineWrapper()
219
  except Exception as e:
220
- logger.error(f"Failed to initialize RAG pipeline: {e}")
221
  rag_pipeline = None
222
 
223
 
 
13
 
14
  sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
15
 
 
16
  from scientific_rag.scripts.index_qdrant import index_qdrant
17
 
18
 
 
27
  raise FileNotFoundError(f"Required file not found: {chunks_file}")
28
 
29
  logger.info("Initializing Qdrant with chunks from local script...")
30
+ try:
31
+ index_qdrant(
32
+ chunks_file=chunks_file,
33
+ embedding_batch_size=32,
34
+ upload_batch_size=100,
35
+ create_collection=True,
36
+ process_batch_size=10000,
37
+ )
38
+ logger.info("✅ Qdrant initialization complete!")
39
+ except Exception as e:
40
+ logger.error(f"Qdrant initialization failed: {e}", exc_info=True)
41
+ raise
42
+
43
 
44
+ # Import RAGPipeline AFTER setting up Qdrant
45
+ from scientific_rag.application.rag.pipeline import RAGPipeline
46
 
47
  MAIN_HEADER = """
48
  <div style="text-align: center; margin-bottom: 40px;">
 
223
 
224
  rag_pipeline = RAGPipelineWrapper()
225
  except Exception as e:
226
+ logger.error(f"Failed to initialize RAG pipeline: {e}", exc_info=True)
227
  rag_pipeline = None
228
 
229
 
src/scientific_rag/scripts/index_qdrant.py CHANGED
@@ -1,5 +1,5 @@
1
- from collections.abc import Iterator
2
  import json
 
3
  from pathlib import Path
4
  from typing import Any
5
 
@@ -9,11 +9,13 @@ from tqdm import tqdm
9
 
10
  from scientific_rag.application.embeddings.encoder import encoder
11
  from scientific_rag.domain.documents import PaperChunk
12
- from scientific_rag.infrastructure.qdrant import QdrantService
13
  from scientific_rag.settings import settings
14
 
15
 
16
- def load_chunks_generator(chunks_file: Path, batch_size: int = 10000) -> Iterator[list[PaperChunk]]:
 
 
17
  logger.info(f"Loading chunks from {chunks_file} in batches of {batch_size}")
18
 
19
  with open(chunks_file, encoding="utf-8") as f:
@@ -61,11 +63,15 @@ def embed_sparse_chunks(
61
  show_progress: bool = True,
62
  ) -> list[Any]:
63
  """Generate sparse BM25 embeddings for chunks."""
64
- logger.info(f"Embedding {len(chunks)} chunks (Sparse BM25) with batch size {batch_size}")
 
 
65
 
66
  texts = [chunk.text for chunk in chunks]
67
 
68
- sparse_embeddings = list(sparse_encoder.embed(documents=texts, batch_size=batch_size, parallel=None))
 
 
69
 
70
  logger.success(f"Generated sparse embeddings for {len(chunks)} chunks")
71
  return sparse_embeddings
@@ -74,14 +80,17 @@ def embed_sparse_chunks(
74
  def index_chunks_to_qdrant(
75
  chunks: list[PaperChunk],
76
  sparse_embeddings: list[Any],
77
- qdrant_service: QdrantService,
78
  batch_size: int = 100,
79
  show_progress: bool = True,
80
  ) -> int:
81
  """Upload chunks to Qdrant in batches."""
82
  total_uploaded = 0
83
 
84
- iterator = tqdm(range(0, len(chunks), batch_size), desc="Uploading to Qdrant", disable=not show_progress)
 
 
 
 
85
  for i in iterator:
86
  batch_chunks = chunks[i : i + batch_size]
87
 
@@ -89,7 +98,9 @@ def index_chunks_to_qdrant(
89
  if sparse_embeddings:
90
  batch_sparse = sparse_embeddings[i : i + batch_size]
91
 
92
- uploaded = qdrant_service.upsert_chunks(batch_chunks, sparse_embeddings=batch_sparse)
 
 
93
  total_uploaded += uploaded
94
 
95
  return total_uploaded
@@ -112,30 +123,43 @@ def index_qdrant(
112
  process_batch_size: Process chunks in batches of this size to manage memory
113
  """
114
  if chunks_file is None:
115
- chunks_file = Path(settings.root_dir) / "data" / "processed" / f"chunks_{settings.dataset_split}.json"
 
 
 
 
 
116
  else:
117
  chunks_file = Path(chunks_file)
118
 
119
  if not chunks_file.exists():
120
  raise FileNotFoundError(f"Chunks file not found: {chunks_file}")
121
 
122
- qdrant_service = QdrantService()
 
123
  if create_collection:
124
- qdrant_service.create_collection(vector_size=encoder.embedding_dim)
 
125
 
126
  logger.info(f"Initializing Sparse Encoder: {settings.sparse_embedding_model_name}")
127
- sparse_encoder = SparseTextEmbedding(model_name=settings.sparse_embedding_model_name)
 
 
128
 
129
  logger.info("Processing chunks in streaming batches to manage memory...")
130
  total_uploaded = 0
131
  batch_num = 0
132
 
133
- for batch_chunks in load_chunks_generator(chunks_file, batch_size=process_batch_size):
 
 
134
  batch_num += 1
135
  batch_start = (batch_num - 1) * process_batch_size
136
  batch_end = batch_start + len(batch_chunks)
137
 
138
- logger.info(f"--- Processing Batch {batch_num} (Chunks {batch_start}-{batch_end}) ---")
 
 
139
 
140
  batch_chunks = embed_chunks(
141
  chunks=batch_chunks,
@@ -154,13 +178,14 @@ def index_qdrant(
154
  batch_uploaded = index_chunks_to_qdrant(
155
  chunks=batch_chunks,
156
  sparse_embeddings=batch_sparse,
157
- qdrant_service=qdrant_service,
158
  batch_size=upload_batch_size,
159
  show_progress=True,
160
  )
161
  total_uploaded += batch_uploaded
162
 
163
- logger.success(f"Batch {batch_num} complete: {batch_uploaded} chunks uploaded (Total: {total_uploaded})")
 
 
164
 
165
  logger.info("Getting final statistics...")
166
  collection_info = qdrant_service.get_collection_info()
 
 
1
  import json
2
+ from collections.abc import Iterator
3
  from pathlib import Path
4
  from typing import Any
5
 
 
9
 
10
  from scientific_rag.application.embeddings.encoder import encoder
11
  from scientific_rag.domain.documents import PaperChunk
12
+ from scientific_rag.infrastructure.qdrant import qdrant_service
13
  from scientific_rag.settings import settings
14
 
15
 
16
+ def load_chunks_generator(
17
+ chunks_file: Path, batch_size: int = 10000
18
+ ) -> Iterator[list[PaperChunk]]:
19
  logger.info(f"Loading chunks from {chunks_file} in batches of {batch_size}")
20
 
21
  with open(chunks_file, encoding="utf-8") as f:
 
63
  show_progress: bool = True,
64
  ) -> list[Any]:
65
  """Generate sparse BM25 embeddings for chunks."""
66
+ logger.info(
67
+ f"Embedding {len(chunks)} chunks (Sparse BM25) with batch size {batch_size}"
68
+ )
69
 
70
  texts = [chunk.text for chunk in chunks]
71
 
72
+ sparse_embeddings = list(
73
+ sparse_encoder.embed(documents=texts, batch_size=batch_size, parallel=None)
74
+ )
75
 
76
  logger.success(f"Generated sparse embeddings for {len(chunks)} chunks")
77
  return sparse_embeddings
 
80
  def index_chunks_to_qdrant(
81
  chunks: list[PaperChunk],
82
  sparse_embeddings: list[Any],
 
83
  batch_size: int = 100,
84
  show_progress: bool = True,
85
  ) -> int:
86
  """Upload chunks to Qdrant in batches."""
87
  total_uploaded = 0
88
 
89
+ iterator = tqdm(
90
+ range(0, len(chunks), batch_size),
91
+ desc="Uploading to Qdrant",
92
+ disable=not show_progress,
93
+ )
94
  for i in iterator:
95
  batch_chunks = chunks[i : i + batch_size]
96
 
 
98
  if sparse_embeddings:
99
  batch_sparse = sparse_embeddings[i : i + batch_size]
100
 
101
+ uploaded = qdrant_service.upsert_chunks(
102
+ batch_chunks, sparse_embeddings=batch_sparse
103
+ )
104
  total_uploaded += uploaded
105
 
106
  return total_uploaded
 
123
  process_batch_size: Process chunks in batches of this size to manage memory
124
  """
125
  if chunks_file is None:
126
+ chunks_file = (
127
+ Path(settings.root_dir)
128
+ / "data"
129
+ / "processed"
130
+ / f"chunks_{settings.dataset_split}.json"
131
+ )
132
  else:
133
  chunks_file = Path(chunks_file)
134
 
135
  if not chunks_file.exists():
136
  raise FileNotFoundError(f"Chunks file not found: {chunks_file}")
137
 
138
+ # Use the module-level singleton so in-memory Qdrant is shared
139
+ # between the indexer and the application runtime.
140
  if create_collection:
141
+ vector_size = getattr(encoder, "embedding_dim", 384)
142
+ qdrant_service.create_collection(vector_size=vector_size)
143
 
144
  logger.info(f"Initializing Sparse Encoder: {settings.sparse_embedding_model_name}")
145
+ sparse_encoder = SparseTextEmbedding(
146
+ model_name=settings.sparse_embedding_model_name
147
+ )
148
 
149
  logger.info("Processing chunks in streaming batches to manage memory...")
150
  total_uploaded = 0
151
  batch_num = 0
152
 
153
+ for batch_chunks in load_chunks_generator(
154
+ chunks_file, batch_size=process_batch_size
155
+ ):
156
  batch_num += 1
157
  batch_start = (batch_num - 1) * process_batch_size
158
  batch_end = batch_start + len(batch_chunks)
159
 
160
+ logger.info(
161
+ f"--- Processing Batch {batch_num} (Chunks {batch_start}-{batch_end}) ---"
162
+ )
163
 
164
  batch_chunks = embed_chunks(
165
  chunks=batch_chunks,
 
178
  batch_uploaded = index_chunks_to_qdrant(
179
  chunks=batch_chunks,
180
  sparse_embeddings=batch_sparse,
 
181
  batch_size=upload_batch_size,
182
  show_progress=True,
183
  )
184
  total_uploaded += batch_uploaded
185
 
186
+ logger.success(
187
+ f"Batch {batch_num} complete: {batch_uploaded} chunks uploaded (Total: {total_uploaded})"
188
+ )
189
 
190
  logger.info("Getting final statistics...")
191
  collection_info = qdrant_service.get_collection_info()