TilanB commited on
Commit
b5d7f6d
·
verified ·
1 Parent(s): 33bc5e4
Files changed (1) hide show
  1. search_engine/indexer.py +76 -23
search_engine/indexer.py CHANGED
@@ -12,6 +12,7 @@ import time
12
  import hashlib
13
  import os
14
  import json
 
15
 
16
  from langchain_core.documents import Document
17
  from langchain_core.retrievers import BaseRetriever
@@ -24,12 +25,18 @@ from configuration.parameters import parameters
24
 
25
  logger = logging.getLogger(__name__)
26
 
 
 
 
27
 
28
  def doc_id(doc) -> str:
 
29
  src = doc.metadata.get("source", "")
30
  page = doc.metadata.get("page", "")
31
  chunk = doc.metadata.get("chunk_id", "")
32
- base = f"{src}::{page}::{chunk}"
 
 
33
  return hashlib.sha256(base.encode("utf-8")).hexdigest()
34
 
35
 
@@ -38,15 +45,28 @@ def content_hash(doc) -> str:
38
 
39
 
40
  def load_manifest(path):
 
41
  if os.path.exists(path):
42
- with open(path, "r") as f:
43
- return json.load(f)
 
 
 
 
44
  return {}
45
 
46
 
47
  def save_manifest(path, manifest):
48
- with open(path, "w") as f:
49
- json.dump(manifest, f)
 
 
 
 
 
 
 
 
50
 
51
 
52
  class EnsembleRetriever(BaseRetriever):
@@ -132,12 +152,13 @@ class RetrieverBuilder:
132
  m.update(str(v).encode('utf-8'))
133
  return m.hexdigest()
134
 
135
- def build_hybrid_retriever(self, docs) -> EnsembleRetriever:
136
  """
137
  Build hybrid retriever using BM25 and vector search.
138
 
139
  Args:
140
  docs: List of documents to index
 
141
 
142
  Returns:
143
  EnsembleRetriever combining BM25 and vector search
@@ -145,18 +166,31 @@ class RetrieverBuilder:
145
  logger.info(f"Building hybrid retriever with {len(docs)} documents...")
146
  if not docs:
147
  raise ValueError("No documents provided")
148
- chroma_dir = parameters.CHROMA_DB_PATH
 
 
 
 
 
 
149
  manifest_path = os.path.join(chroma_dir, "indexed_manifest.json")
150
  os.makedirs(chroma_dir, exist_ok=True)
151
- manifest = load_manifest(manifest_path)
 
 
 
 
 
152
  vector_store = Chroma(
153
  embedding_function=self.embeddings,
154
  persist_directory=chroma_dir,
155
  )
 
156
  to_add = []
157
  ids_to_add = []
158
  to_delete_ids = []
159
  current_ids = set()
 
160
  for d in docs:
161
  _id = doc_id(d)
162
  _hash = content_hash(d)
@@ -170,6 +204,7 @@ class RetrieverBuilder:
170
  to_add.append(d)
171
  ids_to_add.append(_id)
172
  manifest[_id] = _hash
 
173
  if to_add:
174
  # Safety net: de-dupe before add_documents
175
  seen = set()
@@ -180,20 +215,35 @@ class RetrieverBuilder:
180
  seen.add(_id)
181
  uniq_docs.append(doc)
182
  uniq_ids.append(_id)
183
- # Debugging: show duplicate IDs and their sources
184
- from collections import Counter
185
- counts = Counter(ids_to_add)
186
- dupes = [i for i, c in counts.items() if c > 1]
187
- if dupes:
188
- print("Duplicate IDs:", len(dupes))
189
- for d in dupes[:10]:
190
- idxs = [k for k, x in enumerate(ids_to_add) if x == d]
191
- print("ID:", d, "examples:")
192
- for k in idxs[:3]:
193
- md = to_add[k].metadata
194
- print(" ", md.get("source"), md.get("page"), md.get("chunk_index"))
195
- vector_store.add_documents(uniq_docs, ids=uniq_ids)
196
- save_manifest(manifest_path, manifest)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
197
  # Create BM25 retriever
198
  t_bm25_start = time.time()
199
  texts = [doc.page_content for doc in docs]
@@ -203,6 +253,7 @@ class RetrieverBuilder:
203
  t_bm25_end = time.time()
204
  logger.info(f"[PROFILE] BM25 retriever creation: {t_bm25_end - t_bm25_start:.2f}s")
205
  logger.debug(f"BM25 indexed {len(texts)} texts, k={bm25_retriever.k}")
 
206
  t_vec_retr_start = time.time()
207
  vector_retriever = vector_store.as_retriever(
208
  search_type="mmr",
@@ -215,6 +266,7 @@ class RetrieverBuilder:
215
  t_vec_retr_end = time.time()
216
  logger.info(f"[PROFILE] Vector retriever creation: {t_vec_retr_end - t_vec_retr_start:.2f}s")
217
  logger.debug("Vector retriever created")
 
218
  t_ensemble_start = time.time()
219
  hybrid_retriever = EnsembleRetriever(
220
  retrievers=[bm25_retriever, vector_retriever],
@@ -224,5 +276,6 @@ class RetrieverBuilder:
224
  t_ensemble_end = time.time()
225
  logger.info(f"[PROFILE] Ensemble retriever creation: {t_ensemble_end - t_ensemble_start:.2f}s")
226
  logger.info(f"Hybrid retriever created (k={parameters.VECTOR_SEARCH_K})")
227
- logger.info(f"[PROFILE] Total hybrid retriever build: {t_ensemble_end - t_bm25_start:.2f}s")
 
228
  return hybrid_retriever
 
12
  import hashlib
13
  import os
14
  import json
15
+ import threading
16
 
17
  from langchain_core.documents import Document
18
  from langchain_core.retrievers import BaseRetriever
 
25
 
26
  logger = logging.getLogger(__name__)
27
 
28
+ # Thread lock for manifest file access
29
+ _manifest_lock = threading.Lock()
30
+
31
 
32
  def doc_id(doc) -> str:
33
+ """Generate a unique ID for a document based on source, page, chunk_id, and content hash."""
34
  src = doc.metadata.get("source", "")
35
  page = doc.metadata.get("page", "")
36
  chunk = doc.metadata.get("chunk_id", "")
37
+ # Include content hash to ensure uniqueness even if chunk_id is missing
38
+ content = hashlib.sha256(doc.page_content.encode("utf-8")).hexdigest()[:16]
39
+ base = f"{src}::{page}::{chunk}::{content}"
40
  return hashlib.sha256(base.encode("utf-8")).hexdigest()
41
 
42
 
 
45
 
46
 
47
  def load_manifest(path):
48
+ """Thread-safe manifest loading."""
49
  if os.path.exists(path):
50
+ try:
51
+ with open(path, "r") as f:
52
+ return json.load(f)
53
+ except (json.JSONDecodeError, IOError) as e:
54
+ logger.warning(f"Failed to load manifest, starting fresh: {e}")
55
+ return {}
56
  return {}
57
 
58
 
59
  def save_manifest(path, manifest):
60
+ """Thread-safe manifest saving with atomic write."""
61
+ temp_path = path + ".tmp"
62
+ try:
63
+ with open(temp_path, "w") as f:
64
+ json.dump(manifest, f)
65
+ os.replace(temp_path, path) # Atomic rename
66
+ except Exception as e:
67
+ logger.error(f"Failed to save manifest: {e}")
68
+ if os.path.exists(temp_path):
69
+ os.remove(temp_path)
70
 
71
 
72
  class EnsembleRetriever(BaseRetriever):
 
152
  m.update(str(v).encode('utf-8'))
153
  return m.hexdigest()
154
 
155
+ def build_hybrid_retriever(self, docs, session_id: str = None) -> EnsembleRetriever:
156
  """
157
  Build hybrid retriever using BM25 and vector search.
158
 
159
  Args:
160
  docs: List of documents to index
161
+ session_id: Optional session ID for user isolation (recommended for multi-user)
162
 
163
  Returns:
164
  EnsembleRetriever combining BM25 and vector search
 
166
  logger.info(f"Building hybrid retriever with {len(docs)} documents...")
167
  if not docs:
168
  raise ValueError("No documents provided")
169
+
170
+ # Use session-specific directory if provided (for multi-user isolation)
171
+ if session_id:
172
+ chroma_dir = os.path.join(parameters.CHROMA_DB_PATH, f"session_{session_id}")
173
+ else:
174
+ chroma_dir = parameters.CHROMA_DB_PATH
175
+
176
  manifest_path = os.path.join(chroma_dir, "indexed_manifest.json")
177
  os.makedirs(chroma_dir, exist_ok=True)
178
+
179
+ # Thread-safe manifest access
180
+ with _manifest_lock:
181
+ manifest = load_manifest(manifest_path)
182
+
183
+ t_vector_start = time.time()
184
  vector_store = Chroma(
185
  embedding_function=self.embeddings,
186
  persist_directory=chroma_dir,
187
  )
188
+
189
  to_add = []
190
  ids_to_add = []
191
  to_delete_ids = []
192
  current_ids = set()
193
+
194
  for d in docs:
195
  _id = doc_id(d)
196
  _hash = content_hash(d)
 
204
  to_add.append(d)
205
  ids_to_add.append(_id)
206
  manifest[_id] = _hash
207
+
208
  if to_add:
209
  # Safety net: de-dupe before add_documents
210
  seen = set()
 
215
  seen.add(_id)
216
  uniq_docs.append(doc)
217
  uniq_ids.append(_id)
218
+
219
+ # Log duplicate count for debugging
220
+ dupe_count = len(to_add) - len(uniq_docs)
221
+ if dupe_count > 0:
222
+ logger.debug(f"Filtered {dupe_count} duplicate documents before indexing")
223
+
224
+ # Batch add documents for better performance
225
+ logger.info(f"[PROFILE] Adding {len(uniq_docs)} new documents to vector store...")
226
+ t_add_start = time.time()
227
+
228
+ # Add in batches for progress tracking and memory efficiency
229
+ batch_size = 100
230
+ for i in range(0, len(uniq_docs), batch_size):
231
+ batch_docs = uniq_docs[i:i+batch_size]
232
+ batch_ids = uniq_ids[i:i+batch_size]
233
+ vector_store.add_documents(batch_docs, ids=batch_ids)
234
+ if len(uniq_docs) > batch_size:
235
+ logger.debug(f"[PROFILE] Indexed batch {i//batch_size + 1}/{(len(uniq_docs)-1)//batch_size + 1}")
236
+
237
+ t_add_end = time.time()
238
+ logger.info(f"[PROFILE] Vector store add_documents: {t_add_end - t_add_start:.2f}s")
239
+
240
+ t_vector_end = time.time()
241
+ logger.info(f"[PROFILE] Total vector store setup: {t_vector_end - t_vector_start:.2f}s")
242
+
243
+ # Thread-safe manifest save
244
+ with _manifest_lock:
245
+ save_manifest(manifest_path, manifest)
246
+
247
  # Create BM25 retriever
248
  t_bm25_start = time.time()
249
  texts = [doc.page_content for doc in docs]
 
253
  t_bm25_end = time.time()
254
  logger.info(f"[PROFILE] BM25 retriever creation: {t_bm25_end - t_bm25_start:.2f}s")
255
  logger.debug(f"BM25 indexed {len(texts)} texts, k={bm25_retriever.k}")
256
+
257
  t_vec_retr_start = time.time()
258
  vector_retriever = vector_store.as_retriever(
259
  search_type="mmr",
 
266
  t_vec_retr_end = time.time()
267
  logger.info(f"[PROFILE] Vector retriever creation: {t_vec_retr_end - t_vec_retr_start:.2f}s")
268
  logger.debug("Vector retriever created")
269
+
270
  t_ensemble_start = time.time()
271
  hybrid_retriever = EnsembleRetriever(
272
  retrievers=[bm25_retriever, vector_retriever],
 
276
  t_ensemble_end = time.time()
277
  logger.info(f"[PROFILE] Ensemble retriever creation: {t_ensemble_end - t_ensemble_start:.2f}s")
278
  logger.info(f"Hybrid retriever created (k={parameters.VECTOR_SEARCH_K})")
279
+ logger.info(f"[PROFILE] Total hybrid retriever build: {t_ensemble_end - t_vector_start:.2f}s")
280
+
281
  return hybrid_retriever