minh-4T commited on
Commit
cb01350
·
1 Parent(s): f57df7c

clean code

Browse files
core/config.py CHANGED
@@ -9,7 +9,7 @@ try:
9
  except Exception:
10
  pass
11
 
12
-
13
  def _is_hf_persistent_storage_available() -> bool:
14
  data_dir = Path('/data')
15
  return data_dir.exists() and os.access(data_dir, os.W_OK)
@@ -17,13 +17,13 @@ def _is_hf_persistent_storage_available() -> bool:
17
 
18
  _USE_HF_PERSISTENT_STORAGE = _is_hf_persistent_storage_available()
19
 
20
-
21
  def _default_documents_db_url() -> str:
22
  if _USE_HF_PERSISTENT_STORAGE:
23
  return 'sqlite:////data/rag_metadata.db'
24
  return 'sqlite:///./rag_metadata.db'
25
 
26
-
27
  def _bounded_int_from_env(name: str, default: int, minimum: int, maximum: int) -> int:
28
  raw_value = os.getenv(name, str(default))
29
  try:
@@ -48,7 +48,6 @@ CHUNK_OVERLAP = int(os.getenv('CHUNK_OVERLAP', '150'))
48
  TOP_K_RESULTS = int(os.getenv('TOP_K_RESULTS', '10'))
49
  FINAL_TOP_K = int(os.getenv('FINAL_TOP_K', '5'))
50
 
51
- QDRANT_COLLECTION = os.getenv('QDRANT_COLLECTION', 'rag_docs')
52
  DOCUMENTS_DATABASE_URL = os.getenv('DOCUMENTS_DATABASE_URL', _default_documents_db_url())
53
 
54
  # External service configs
 
9
  except Exception:
10
  pass
11
 
12
+ #Kiểm tra xem có thư mục /data không và có quyền ghi hay không để quyết định sử dụng persistent storage của Hugging Face hay không
13
  def _is_hf_persistent_storage_available() -> bool:
14
  data_dir = Path('/data')
15
  return data_dir.exists() and os.access(data_dir, os.W_OK)
 
17
 
18
  _USE_HF_PERSISTENT_STORAGE = _is_hf_persistent_storage_available()
19
 
20
+ # Nếu sử dụng persistent storage của Hugging Face, lưu trữ metadata vào /data/rag_metadata.db, ngược lại lưu vào thư mục hiện tại
21
  def _default_documents_db_url() -> str:
22
  if _USE_HF_PERSISTENT_STORAGE:
23
  return 'sqlite:////data/rag_metadata.db'
24
  return 'sqlite:///./rag_metadata.db'
25
 
26
+ # Hàm tiện ích để lấy giá trị int từ biến môi trường với giới hạn min và max, trả về default nếu không hợp lệ
27
  def _bounded_int_from_env(name: str, default: int, minimum: int, maximum: int) -> int:
28
  raw_value = os.getenv(name, str(default))
29
  try:
 
48
  TOP_K_RESULTS = int(os.getenv('TOP_K_RESULTS', '10'))
49
  FINAL_TOP_K = int(os.getenv('FINAL_TOP_K', '5'))
50
 
 
51
  DOCUMENTS_DATABASE_URL = os.getenv('DOCUMENTS_DATABASE_URL', _default_documents_db_url())
52
 
53
  # External service configs
rag/collection_router_retriever.py CHANGED
@@ -1,6 +1,7 @@
1
  import hashlib
2
  import logging
3
  from typing import List
 
4
 
5
  from langchain_core.documents import Document as LangChainDocument
6
  from rank_bm25 import BM25Okapi
@@ -37,7 +38,8 @@ class CollectionRouterRetriever:
37
  self.embeddings_model = embeddings_model
38
  self.top_n_collections = max(1, int(top_n_collections or 3))
39
  # Cache giờ đây lưu một dict: { 'bm25': obj, 'corpus_docs': list, 'count': int }
40
- self._bm25_cache = {}
 
41
 
42
  @staticmethod
43
  def _doc_key(doc) -> str:
@@ -69,18 +71,19 @@ class CollectionRouterRetriever:
69
 
70
  normalized_cohort = (cohort_key or "").strip()
71
  if normalized_cohort:
72
- return [
73
  collection_name
74
  for collection_name in active_collections
75
  if collection_matches_cohort(collection_name, normalized_cohort)
76
  ]
 
77
 
78
  return active_collections[: self.top_n_collections]
79
 
80
  def _ensure_bm25_loaded(self, collection_name: str) -> tuple[BM25Okapi, List[LangChainDocument]] | None:
81
  """Lazy load and cache BM25 index and corpus for a collection (với cơ chế tự động làm mới Cache)"""
82
 
83
- # 1. Lấy tổng số chunks hiện tại trong Qdrant (Rất nhanh, tốn < 10ms)
84
  try:
85
  collection_info = self.qdrant_client.get_collection(collection_name)
86
  current_count = collection_info.points_count
@@ -89,10 +92,11 @@ class CollectionRouterRetriever:
89
  return None
90
 
91
  # 2. Kiểm tra Cache: Nếu chưa có hoặc số lượng thay đổi -> Xóa cache build lại
92
- cached_data = self._bm25_cache.get(collection_name)
93
- if cached_data and cached_data.get('count') == current_count:
94
- # Tái sử dụng (Phải trả về cả bm25 VÀ corpus_docs để map điểm)
95
- return cached_data['bm25'], cached_data['corpus_docs']
 
96
 
97
  logger.info(f"Phát hiện dữ liệu mới hoặc chưa có cache cho {collection_name} (Count: {current_count}). Đang build lại BM25...")
98
 
@@ -147,11 +151,12 @@ class CollectionRouterRetriever:
147
  bm25 = BM25Okapi(tokenized_docs, k1=1.5, b=0.5)
148
 
149
  # 3. Lưu lại Cache kèm the con số count và corpus_docs để đối chiếu lần sau
150
- self._bm25_cache[collection_name] = {
151
- 'bm25': bm25,
152
- 'corpus_docs': corpus_docs,
153
- 'count': current_count
154
- }
 
155
  logger.info("BM25 index built and cached for collection=%s (docs=%d)", collection_name, len(corpus_docs))
156
 
157
  return bm25, corpus_docs
 
1
  import hashlib
2
  import logging
3
  from typing import List
4
+ from threading import Lock
5
 
6
  from langchain_core.documents import Document as LangChainDocument
7
  from rank_bm25 import BM25Okapi
 
38
  self.embeddings_model = embeddings_model
39
  self.top_n_collections = max(1, int(top_n_collections or 3))
40
  # Cache giờ đây lưu một dict: { 'bm25': obj, 'corpus_docs': list, 'count': int }
41
+ self._bm25_cache = {}
42
+ self._bm25_lock = Lock() # Thread-safe lock cho BM25 cache
43
 
44
  @staticmethod
45
  def _doc_key(doc) -> str:
 
71
 
72
  normalized_cohort = (cohort_key or "").strip()
73
  if normalized_cohort:
74
+ matches = [
75
  collection_name
76
  for collection_name in active_collections
77
  if collection_matches_cohort(collection_name, normalized_cohort)
78
  ]
79
+ return matches[:1] if matches else []
80
 
81
  return active_collections[: self.top_n_collections]
82
 
83
  def _ensure_bm25_loaded(self, collection_name: str) -> tuple[BM25Okapi, List[LangChainDocument]] | None:
84
  """Lazy load and cache BM25 index and corpus for a collection (với cơ chế tự động làm mới Cache)"""
85
 
86
+ # 1. Lấy tổng số chunks hiện tại trong Qdrant
87
  try:
88
  collection_info = self.qdrant_client.get_collection(collection_name)
89
  current_count = collection_info.points_count
 
92
  return None
93
 
94
  # 2. Kiểm tra Cache: Nếu chưa có hoặc số lượng thay đổi -> Xóa cache build lại
95
+ with self._bm25_lock:
96
+ cached_data = self._bm25_cache.get(collection_name)
97
+ if cached_data and cached_data.get('count') == current_count:
98
+ # Tái sử dụng (Phải trả về cả bm25 corpus_docs để map điểm)
99
+ return cached_data['bm25'], cached_data['corpus_docs']
100
 
101
  logger.info(f"Phát hiện dữ liệu mới hoặc chưa có cache cho {collection_name} (Count: {current_count}). Đang build lại BM25...")
102
 
 
151
  bm25 = BM25Okapi(tokenized_docs, k1=1.5, b=0.5)
152
 
153
  # 3. Lưu lại Cache kèm the con số count và corpus_docs để đối chiếu lần sau
154
+ with self._bm25_lock:
155
+ self._bm25_cache[collection_name] = {
156
+ 'bm25': bm25,
157
+ 'corpus_docs': corpus_docs,
158
+ 'count': current_count
159
+ }
160
  logger.info("BM25 index built and cached for collection=%s (docs=%d)", collection_name, len(corpus_docs))
161
 
162
  return bm25, corpus_docs
rag/collection_utils.py CHANGED
@@ -2,8 +2,6 @@ import re
2
  from typing import Set
3
 
4
  _COLLECTION_SAFE_RE = re.compile(r"[^a-z0-9_]+")
5
- _YEAR_PATTERN = re.compile(r"(20\d{2})")
6
-
7
 
8
  def normalize_folder_key(folder_key: str) -> str:
9
  value = (folder_key or "").strip().lower()
@@ -20,10 +18,6 @@ def build_collection_name(folder_key: str, prefix: str = "rag") -> str:
20
  return base[:63]
21
 
22
 
23
- def extract_year_tokens(value: str) -> Set[str]:
24
- return {token for token in _YEAR_PATTERN.findall(value or "")}
25
-
26
-
27
  def extract_folder_key_from_collection_name(collection_name: str, prefix: str = "rag") -> str | None:
28
  """
29
  Extract folder_key from collection name.
@@ -53,19 +47,3 @@ def collection_matches_cohort(collection_name: str, cohort_key: str, prefix: str
53
  return False
54
 
55
  return extracted.lower() == cohort_key.lower()
56
-
57
-
58
- def collection_matches_year(collection_name: str, year_scope: str) -> bool:
59
- if not year_scope:
60
- return False
61
-
62
- collection_years = extract_year_tokens(collection_name)
63
- target_years = extract_year_tokens(year_scope)
64
- if not target_years:
65
- return False
66
-
67
- # For explicit ranges (e.g. 2022-2023), require all years to match.
68
- if len(target_years) >= 2:
69
- return target_years.issubset(collection_years)
70
-
71
- return bool(collection_years.intersection(target_years))
 
2
  from typing import Set
3
 
4
  _COLLECTION_SAFE_RE = re.compile(r"[^a-z0-9_]+")
 
 
5
 
6
  def normalize_folder_key(folder_key: str) -> str:
7
  value = (folder_key or "").strip().lower()
 
18
  return base[:63]
19
 
20
 
 
 
 
 
21
  def extract_folder_key_from_collection_name(collection_name: str, prefix: str = "rag") -> str | None:
22
  """
23
  Extract folder_key from collection name.
 
47
  return False
48
 
49
  return extracted.lower() == cohort_key.lower()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
rag/qa_pipeline.py CHANGED
@@ -17,17 +17,6 @@ MAX_CONTEXT_CHARS = 12000
17
  MAX_DOC_CHARS = 1800
18
  MAX_OUT_CHARS = 3000
19
 
20
- # Quản lý API Keys cho Groq và Gemini với xoay tua tự động khi gặp lỗi hoặc hết hạn
21
-
22
-
23
- def sanitize_for_prompt(text: str) -> str:
24
- """Lọc bỏ prompt injection và PII """
25
- text = re.sub(r"(?i)(ignore previous instructions|system prompt|developer message|jailbreak)", "[FILTERED_INJECTION]", text)
26
- text = re.sub(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", "[EMAIL]", text)
27
- text = re.sub(r"\b(0\d{9}|\+84\d{9,10})\b", "[PHONE]", text)
28
- text = re.sub(r"\b\d{8,12}\b", "[ID]", text)
29
- return text.strip()
30
-
31
  def generate_standalone_query(message: str, history: List) -> str:
32
  """Tái tạo câu hỏi từ lịch sử """
33
  if not history:
 
17
  MAX_DOC_CHARS = 1800
18
  MAX_OUT_CHARS = 3000
19
 
 
 
 
 
 
 
 
 
 
 
 
20
  def generate_standalone_query(message: str, history: List) -> str:
21
  """Tái tạo câu hỏi từ lịch sử """
22
  if not history:
services/document_ingest_service.py CHANGED
@@ -223,8 +223,7 @@ def process_document_ingest(
223
 
224
  if not vectors or not vectors[0]:
225
  raise ValueError("Failed to create embeddings for chunks.")
226
-
227
- target_collection = (collection_name or document.collection_name or QDRANT_COLLECTION or "").strip()
228
  if not target_collection:
229
  raise ValueError("Target collection is empty.")
230
 
 
223
 
224
  if not vectors or not vectors[0]:
225
  raise ValueError("Failed to create embeddings for chunks.")
226
+ target_collection = (collection_name or document.collection_name or "rag_docs" or "").strip()
 
227
  if not target_collection:
228
  raise ValueError("Target collection is empty.")
229