Nguyễn Quốc Vỹ commited on
Commit
47738d8
·
1 Parent(s): ab66a23

Tối ưu logic index: chỉ index chunk mới, bỏ qua tài liệu/chunk đã tồn tại

Browse files
backend/admin_services.py CHANGED
@@ -17,6 +17,8 @@ from backend import db
17
  from backend.auth import is_admin
18
  from backend.runtime_paths import PDF_DIR
19
  from backend.db_sync import schedule_pdf_upload, schedule_pdf_delete, schedule_vector_sync
 
 
20
 
21
 
22
  # ======================== UserService ========================
@@ -109,8 +111,10 @@ def create_system_doc(file_path: str, filename: str = None) -> tuple[bool, str]:
109
  else:
110
  shutil.copy2(file_path, dest)
111
  db.insert_tai_lieu_he_thong(ma_tai_lieu=ma, ten_file=filename, duong_dan=os.path.abspath(dest))
 
 
112
  schedule_pdf_upload(dest, filename)
113
- return True, "Đã thêm tài liệu. Hãy chạy 'Tái lập chỉ mục' để cập nhật RAG."
114
  except Exception as e:
115
  return False, str(e)
116
 
@@ -133,8 +137,15 @@ def update_system_doc(ma_tai_lieu: str, file_path: str = None, ten_file: str = N
133
  return False, str(e)
134
  # Cập nhật tên trong DB nếu đổi tên (cần hàm update trong db - hiện chỉ có insert upsert)
135
  db.insert_tai_lieu_he_thong(ma_tai_lieu=ma_tai_lieu, ten_file=new_name, duong_dan=old_path)
 
 
 
 
 
 
 
136
  schedule_pdf_upload(old_path, new_name)
137
- return True, "Đã cập nhật. Chạy 'Tái lập chỉ mục' nếu đã thay file."
138
 
139
 
140
  def delete_system_doc(ma_tai_lieu: str) -> tuple[bool, str]:
 
17
  from backend.auth import is_admin
18
  from backend.runtime_paths import PDF_DIR
19
  from backend.db_sync import schedule_pdf_upload, schedule_pdf_delete, schedule_vector_sync
20
+ from data_processing.dynamic_indexing import add_pdf_file
21
+ from data_processing.indexing import delete_chunks_by_source
22
 
23
 
24
  # ======================== UserService ========================
 
111
  else:
112
  shutil.copy2(file_path, dest)
113
  db.insert_tai_lieu_he_thong(ma_tai_lieu=ma, ten_file=filename, duong_dan=os.path.abspath(dest))
114
+ # Index ngay sau khi upload để hỏi đáp dùng được luôn.
115
+ chunks_added = add_pdf_file(dest)
116
  schedule_pdf_upload(dest, filename)
117
+ return True, f"Đã thêm tài liệu index {chunks_added} chunks."
118
  except Exception as e:
119
  return False, str(e)
120
 
 
137
  return False, str(e)
138
  # Cập nhật tên trong DB nếu đổi tên (cần hàm update trong db - hiện chỉ có insert upsert)
139
  db.insert_tai_lieu_he_thong(ma_tai_lieu=ma_tai_lieu, ten_file=new_name, duong_dan=old_path)
140
+ # Khi thay file, cần cập nhật vector ngay để tránh dùng dữ liệu cũ.
141
+ if file_path:
142
+ delete_chunks_by_source(new_name)
143
+ chunks_added = add_pdf_file(old_path)
144
+ msg = f"Đã cập nhật và index lại {chunks_added} chunks."
145
+ else:
146
+ msg = "Đã cập nhật metadata tài liệu."
147
  schedule_pdf_upload(old_path, new_name)
148
+ return True, msg
149
 
150
 
151
  def delete_system_doc(ma_tai_lieu: str) -> tuple[bool, str]:
backend/rag_chain_pg.py CHANGED
@@ -1058,6 +1058,8 @@ def process_uploaded_pdf(uploaded_file, user_id=None) -> dict:
1058
  documents = [{"content": text.strip(), "source": filename}]
1059
  chunks_added = add_new_documents(documents)
1060
  print(f"[PDF] ✅ Indexed {chunks_added} chunks from {filename}")
 
 
1061
 
1062
  if user_id:
1063
  ma_tai_lieu = str(uuid.uuid4())
@@ -1074,6 +1076,7 @@ def process_uploaded_pdf(uploaded_file, user_id=None) -> dict:
1074
  "filename": filename,
1075
  "text": text,
1076
  "chunks_count": chunks_added,
 
1077
  }
1078
 
1079
  except Exception as e:
 
1058
  documents = [{"content": text.strip(), "source": filename}]
1059
  chunks_added = add_new_documents(documents)
1060
  print(f"[PDF] ✅ Indexed {chunks_added} chunks from {filename}")
1061
+ global _source_cache
1062
+ _source_cache = None
1063
 
1064
  if user_id:
1065
  ma_tai_lieu = str(uuid.uuid4())
 
1076
  "filename": filename,
1077
  "text": text,
1078
  "chunks_count": chunks_added,
1079
+ "already_indexed": False,
1080
  }
1081
 
1082
  except Exception as e:
data_processing/dynamic_indexing.py CHANGED
@@ -70,19 +70,46 @@ def add_new_documents(documents: list) -> int:
70
 
71
  batch_size = 500
72
  total = len(documents_list)
 
 
73
  for start in range(0, total, batch_size):
74
  end = min(start + batch_size, total)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
  collection.upsert(
76
- documents=documents_list[start:end],
77
- metadatas=metadatas_list[start:end],
78
- ids=ids_list[start:end],
79
  )
 
80
 
81
  embedding_fn.set_mode("query")
82
 
83
- print(f"✅ Đã thêm {total} chunks mới vào ChromaDB")
 
 
84
  print(f"📊 Tổng chunks hiện tại: {collection.count()}")
85
- return total
86
 
87
 
88
  def add_pdf_file(filepath: str) -> int:
 
70
 
71
  batch_size = 500
72
  total = len(documents_list)
73
+ skipped_existing = 0
74
+ inserted_new = 0
75
  for start in range(0, total, batch_size):
76
  end = min(start + batch_size, total)
77
+ batch_ids = ids_list[start:end]
78
+ existing = collection.get(ids=batch_ids, include=[])
79
+ existing_ids = set(existing.get("ids", []) if existing else [])
80
+
81
+ filtered_docs = []
82
+ filtered_metas = []
83
+ filtered_ids = []
84
+ for doc, meta, chunk_id in zip(
85
+ documents_list[start:end],
86
+ metadatas_list[start:end],
87
+ batch_ids,
88
+ ):
89
+ if chunk_id in existing_ids:
90
+ skipped_existing += 1
91
+ continue
92
+ filtered_docs.append(doc)
93
+ filtered_metas.append(meta)
94
+ filtered_ids.append(chunk_id)
95
+
96
+ if not filtered_ids:
97
+ continue
98
+
99
  collection.upsert(
100
+ documents=filtered_docs,
101
+ metadatas=filtered_metas,
102
+ ids=filtered_ids,
103
  )
104
+ inserted_new += len(filtered_ids)
105
 
106
  embedding_fn.set_mode("query")
107
 
108
+ print(f"✅ Đã thêm {inserted_new} chunks mới vào ChromaDB")
109
+ if skipped_existing:
110
+ print(f"⏭️ Bỏ qua {skipped_existing} chunks đã tồn tại")
111
  print(f"📊 Tổng chunks hiện tại: {collection.count()}")
112
+ return inserted_new
113
 
114
 
115
  def add_pdf_file(filepath: str) -> int:
data_processing/indexing.py CHANGED
@@ -157,11 +157,10 @@ def create_vector_database(chunks: List[Dict]):
157
  print("❌ Không có chunks để index!")
158
  return
159
 
 
160
  embedding_fn = get_embedding_function()
161
  embedding_fn.set_mode("passage")
162
 
163
- collection = get_collection()
164
-
165
  documents = []
166
  metadatas = []
167
  ids = []
@@ -191,19 +190,46 @@ def create_vector_database(chunks: List[Dict]):
191
 
192
  batch_size = 500
193
  total = len(documents)
 
 
194
 
195
  for start in range(0, total, batch_size):
196
  end = min(start + batch_size, total)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
197
  collection.upsert(
198
- documents=documents[start:end],
199
- metadatas=metadatas[start:end],
200
- ids=ids[start:end]
201
  )
202
- print(f" ✅ Đã index {end}/{total} chunks")
 
203
 
204
  embedding_fn.set_mode("query")
205
 
206
- print(f"\n✅ Tổng cộng {total} chunks đã được index vào ChromaDB")
 
 
207
  print(f"📁 Dữ liệu lưu tại: {CHROMA_PERSIST_DIR}")
208
  print(f"🧠 Embedding model: {EMBEDDING_MODEL}")
209
 
@@ -215,6 +241,8 @@ def search(query: str, top_k: int = 5, max_distance: float = 0.8) -> List[Dict]:
215
  max_distance: ngưỡng tối đa, chỉ trả về kết quả có distance < max_distance.
216
  """
217
  collection = get_collection()
 
 
218
 
219
  if collection.count() == 0:
220
  print("[Search] ⚠️ Database rỗng! Chạy run_pipeline.py trước.")
 
157
  print("❌ Không có chunks để index!")
158
  return
159
 
160
+ collection = get_collection()
161
  embedding_fn = get_embedding_function()
162
  embedding_fn.set_mode("passage")
163
 
 
 
164
  documents = []
165
  metadatas = []
166
  ids = []
 
190
 
191
  batch_size = 500
192
  total = len(documents)
193
+ skipped_existing = 0
194
+ inserted_new = 0
195
 
196
  for start in range(0, total, batch_size):
197
  end = min(start + batch_size, total)
198
+ batch_ids = ids[start:end]
199
+ existing = collection.get(ids=batch_ids, include=[])
200
+ existing_ids = set(existing.get("ids", []) if existing else [])
201
+
202
+ filtered_docs = []
203
+ filtered_metas = []
204
+ filtered_ids = []
205
+ for doc, meta, chunk_id in zip(
206
+ documents[start:end],
207
+ metadatas[start:end],
208
+ batch_ids,
209
+ ):
210
+ if chunk_id in existing_ids:
211
+ skipped_existing += 1
212
+ continue
213
+ filtered_docs.append(doc)
214
+ filtered_metas.append(meta)
215
+ filtered_ids.append(chunk_id)
216
+
217
+ if not filtered_ids:
218
+ continue
219
+
220
  collection.upsert(
221
+ documents=filtered_docs,
222
+ metadatas=filtered_metas,
223
+ ids=filtered_ids
224
  )
225
+ inserted_new += len(filtered_ids)
226
+ print(f" ✅ Đã index mới {inserted_new}/{total} chunks")
227
 
228
  embedding_fn.set_mode("query")
229
 
230
+ print(f"\n✅ Tổng cộng {inserted_new} chunks mới đã được index vào ChromaDB")
231
+ if skipped_existing:
232
+ print(f"⏭️ Bỏ qua {skipped_existing} chunks đã tồn tại")
233
  print(f"📁 Dữ liệu lưu tại: {CHROMA_PERSIST_DIR}")
234
  print(f"🧠 Embedding model: {EMBEDDING_MODEL}")
235
 
 
241
  max_distance: ngưỡng tối đa, chỉ trả về kết quả có distance < max_distance.
242
  """
243
  collection = get_collection()
244
+ # Đảm bảo query luôn dùng đúng prefix "query: "
245
+ get_embedding_function().set_mode("query")
246
 
247
  if collection.count() == 0:
248
  print("[Search] ⚠️ Database rỗng! Chạy run_pipeline.py trước.")