VietCat commited on
Commit
bc83228
·
1 Parent(s): 24543f7

update error handling

Browse files
Files changed (5) hide show
  1. app/constants.py +15 -0
  2. app/gemini_client.py +37 -15
  3. app/llm.py +17 -1
  4. app/message_processor.py +80 -46
  5. app/supabase_db.py +65 -34
app/constants.py CHANGED
@@ -206,5 +206,20 @@ SUMMARY_STATUS_MESSAGES = [
206
  "Mình đang chốt lại các điểm chính để trả lời một cách trọn vẹn"
207
  ]
208
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
209
  SHEET_RANGE = 'chat!A2:N'
210
  VERSION_NUMBER = 123456800
 
206
  "Mình đang chốt lại các điểm chính để trả lời một cách trọn vẹn"
207
  ]
208
 
209
+ # LLM retry wait messages for Facebook notifications when prompt is too large
210
+ LLM_RETRY_WAIT_MESSAGES = [
211
+ "Lượng thông tin cần tổng hợp hơi nhiều, bạn cho mình thêm chút thời gian để xử lý nhé.",
212
+ "Thông tin khá phức tạp, mình cần thêm chút thời gian để phân tích kỹ hơn.",
213
+ "Để đảm bảo câu trả lời chính xác nhất, mình cần xem xét lại các văn bản này cẩn thận hơn. Bạn chờ chút nha.",
214
+ "Mình đang sàng lọc lại các văn bản luật để tìm ra câu trả lời phù hợp nhất. Bạn đợi mình một lát nhé.",
215
+ "Có vẻ như câu hỏi của bạn liên quan đến nhiều quy định, mình cần thêm thời gian để tổng hợp lại.",
216
+ "Để tránh nhầm lẫn, mình đang đối chiếu thông tin từ nhiều nguồn. Sẽ sớm có câu trả lời cho bạn thôi.",
217
+ "Thông tin ban đầu khá rộng, mình đang thu hẹp phạm vi để trả lời chính xác hơn. Bạn chờ mình xíu nha.",
218
+ "Mình đang sắp xếp lại các dữ kiện để câu trả lời được mạch lạc. Cảm ơn bạn đã kiên nhẫn!",
219
+ "Câu hỏi này cần phân tích sâu hơn một chút. Mình sẽ phản hồi ngay khi có kết quả nhé.",
220
+ "Mình đang nỗ lực để đưa ra câu trả lời tốt nhất. Quá trình này có thể mất thêm vài giây, mong bạn thông cảm.",
221
+ "Dữ liệu khá lớn, mình đang tóm tắt lại những điểm chính. Bạn vui lòng đợi trong giây lát."
222
+ ]
223
+
224
  SHEET_RANGE = 'chat!A2:N'
225
  VERSION_NUMBER = 123456800
app/gemini_client.py CHANGED
@@ -2,12 +2,25 @@ from google.generativeai.embedding import embed_content
2
  from google.generativeai.client import configure
3
  from google.generativeai.generative_models import GenerativeModel
4
  from loguru import logger
5
- from .request_limit_manager import RequestLimitManager
6
  from typing import List, Optional
 
 
 
7
  from .utils import (
8
  _safe_truncate
9
  )
10
 
 
 
 
 
 
 
 
 
 
 
 
11
  class GeminiClient:
12
  def __init__(self):
13
  self.limit_manager = RequestLimitManager("gemini")
@@ -56,21 +69,30 @@ class GeminiClient:
56
  _model = self._get_model_instance(key, model)
57
 
58
  response = _model.generate_content(prompt, **kwargs)
59
- self.limit_manager.log_request(key, model, success=True)
60
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61
  if hasattr(response, 'usage_metadata'):
62
  logger.info(f"[GEMINI][USAGE] Prompt Token Count: {response.usage_metadata.prompt_token_count} - Candidate Token Count: {response.usage_metadata.candidates_token_count} - Total Token Count: {response.usage_metadata.total_token_count}")
63
 
64
- if hasattr(response, 'text'):
65
- logger.info(f"[GEMINI][TEXT_RESPONSE] {_safe_truncate(response.text)}")
66
- return response.text
67
- elif hasattr(response, 'candidates') and response.candidates:
68
- logger.info(f"[GEMINI][CANDIDATES_RESPONSE] {_safe_truncate(response.candidates[0].content.parts[0].text)}")
69
- return response.candidates[0].content.parts[0].text
70
-
71
- logger.info(f"[GEMINI][RAW_RESPONSE] {response}")
72
- return str(response)
73
-
74
  except Exception as e:
75
  import re
76
  msg = str(e)
@@ -90,10 +112,10 @@ class GeminiClient:
90
  last_error = e
91
  continue
92
  else:
93
- # Lỗi khác không phải rate limit
 
94
  logger.error(f"[GEMINI] Error generating text: {e}")
95
- last_error = e
96
- break
97
 
98
  raise last_error or RuntimeError("No available Gemini API key/model")
99
 
 
2
  from google.generativeai.client import configure
3
  from google.generativeai.generative_models import GenerativeModel
4
  from loguru import logger
 
5
  from typing import List, Optional
6
+ from google.generativeai.types import GenerationConfig
7
+
8
+ from .request_limit_manager import RequestLimitManager
9
  from .utils import (
10
  _safe_truncate
11
  )
12
 
13
+ class GeminiResponseError(Exception):
14
+ """Custom exception for non-retriable Gemini response issues like safety or token limits."""
15
+ def __init__(self, message, finish_reason=None, usage_metadata=None):
16
+ super().__init__(message)
17
+ self.finish_reason = finish_reason
18
+ self.usage_metadata = usage_metadata
19
+
20
+ def __str__(self):
21
+ usage_str = f"Prompt: {self.usage_metadata.prompt_token_count}, Candidates: {self.usage_metadata.candidates_token_count}, Total: {self.usage_metadata.total_token_count}" if self.usage_metadata else "N/A"
22
+ return f"{super().__str__()} (Finish Reason: {self.finish_reason}, Usage: {usage_str})"
23
+
24
  class GeminiClient:
25
  def __init__(self):
26
  self.limit_manager = RequestLimitManager("gemini")
 
69
  _model = self._get_model_instance(key, model)
70
 
71
  response = _model.generate_content(prompt, **kwargs)
 
72
 
73
+ # Kiểm tra các vấn đề về nội dung (MAX_TOKENS, SAFETY, etc.)
74
+ if not response.candidates or response.candidates[0].finish_reason.name not in ["STOP", "FINISH_REASON_UNSPECIFIED"]:
75
+ finish_reason = response.candidates[0].finish_reason if response.candidates else None
76
+ usage_metadata = response.usage_metadata if hasattr(response, 'usage_metadata') else None
77
+ error_message = f"Gemini response finished with reason: {finish_reason.name if finish_reason else 'UNKNOWN'}."
78
+
79
+ # Đây là lỗi logic, raise để lớp gọi xử lý (ví dụ: retry với prompt ngắn hơn)
80
+ raise GeminiResponseError(
81
+ error_message,
82
+ finish_reason=finish_reason.name if finish_reason else 'UNKNOWN',
83
+ usage_metadata=usage_metadata
84
+ )
85
+
86
+ self.limit_manager.log_request(key, model, success=True)
87
  if hasattr(response, 'usage_metadata'):
88
  logger.info(f"[GEMINI][USAGE] Prompt Token Count: {response.usage_metadata.prompt_token_count} - Candidate Token Count: {response.usage_metadata.candidates_token_count} - Total Token Count: {response.usage_metadata.total_token_count}")
89
 
90
+ logger.info(f"[GEMINI][TEXT_RESPONSE] {_safe_truncate(response.text)}")
91
+ return response.text
92
+ except GeminiResponseError as e:
93
+ # Lỗi nội dung, không thể retry bằng cách đổi key. Propagate lên.
94
+ logger.error(f"[GEMINI] Non-retriable content error: {e}")
95
+ raise e
 
 
 
 
96
  except Exception as e:
97
  import re
98
  msg = str(e)
 
112
  last_error = e
113
  continue
114
  else:
115
+ # Lỗi khác không phải rate limit (vd: timeout, server error)
116
+ # sẽ được propagate lên để lớp llm.py xử lý retry với backoff.
117
  logger.error(f"[GEMINI] Error generating text: {e}")
118
+ raise e
 
119
 
120
  raise last_error or RuntimeError("No available Gemini API key/model")
121
 
app/llm.py CHANGED
@@ -6,10 +6,11 @@ import re
6
  import os
7
  import asyncio
8
 
 
9
  import httpx
10
  from loguru import logger
11
 
12
- from .gemini_client import GeminiClient
13
  from .config import get_settings
14
  from .utils import (
15
  timing_decorator_async,
@@ -18,6 +19,18 @@ from .utils import (
18
  _safe_truncate
19
  )
20
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  def _parse_json_from_text(text: str) -> Optional[Union[List[Dict[str, Any]], Dict[str, Any]]]:
22
  """Best-effort JSON extractor from LLM free-form responses.
23
 
@@ -241,9 +254,12 @@ class LLMClient:
241
  logger.error("HFS API response is None")
242
  raise RuntimeError("HFS API response is None")
243
 
 
244
  async def _generate_gemini(self, prompt: str, **kwargs) -> str:
245
  loop = asyncio.get_event_loop()
246
  # Đảm bảo kwargs được truyền nếu GeminiClient hỗ trợ
 
 
247
  return await loop.run_in_executor(None, lambda: self.gemini_client.generate_text(prompt, **kwargs))
248
 
249
  @timing_decorator_async
 
6
  import os
7
  import asyncio
8
 
9
+ from tenacity import retry, stop_after_attempt, wait_exponential
10
  import httpx
11
  from loguru import logger
12
 
13
+ from .gemini_client import GeminiClient, GeminiResponseError
14
  from .config import get_settings
15
  from .utils import (
16
  timing_decorator_async,
 
19
  _safe_truncate
20
  )
21
 
22
+ # --- Retry decorator cho các lỗi tạm thời của LLM (network, server-side) ---
23
+ retry_on_llm_transient_error = retry(
24
+ stop=stop_after_attempt(4), # 1 lần gọi gốc + 3 lần thử lại
25
+ wait=wait_exponential(multiplier=5, min=10, max=60), # Chờ 10s, 20s, 40s
26
+ # Chỉ retry nếu exception KHÔNG PHẢI là GeminiResponseError (lỗi nội dung)
27
+ retry=lambda e: not isinstance(e, GeminiResponseError),
28
+ before_sleep=lambda retry_state: logger.warning(
29
+ f"[LLM][RETRY] LLM call failed with transient error, retrying... "
30
+ f"Attempt: {retry_state.attempt_number}, Error: {retry_state.outcome.exception()}"
31
+ )
32
+ )
33
+
34
  def _parse_json_from_text(text: str) -> Optional[Union[List[Dict[str, Any]], Dict[str, Any]]]:
35
  """Best-effort JSON extractor from LLM free-form responses.
36
 
 
254
  logger.error("HFS API response is None")
255
  raise RuntimeError("HFS API response is None")
256
 
257
+ @retry_on_llm_transient_error
258
  async def _generate_gemini(self, prompt: str, **kwargs) -> str:
259
  loop = asyncio.get_event_loop()
260
  # Đảm bảo kwargs được truyền nếu GeminiClient hỗ trợ
261
+ # Decorator sẽ xử lý retry cho các lỗi tạm thời (network, server).
262
+ # GeminiResponseError (lỗi nội dung) sẽ được raise lên cho message_processor xử lý.
263
  return await loop.run_in_executor(None, lambda: self.gemini_client.generate_text(prompt, **kwargs))
264
 
265
  @timing_decorator_async
app/message_processor.py CHANGED
@@ -3,9 +3,11 @@ import asyncio
3
  import traceback
4
  import json
5
  from loguru import logger
6
- from .constants import START_SEARCHING_MESSAGES, SUMMARY_STATUS_MESSAGES, PROCESSING_STATUS_MESSAGES, FOUND_REGULATIONS_MESSAGES, BATCH_STATUS_MESSAGES
7
- from .utils import get_random_message
 
8
  from .facebook import FacebookClient
 
9
  from app.config import get_settings
10
  import re
11
 
@@ -236,57 +238,89 @@ class MessageProcessor:
236
  async def format_search_results(self, conversation_context: str, question: str, matches: List[Dict[str, Any]], page_token: str, sender_id: str) -> str:
237
  if not matches:
238
  return "Không tìm thấy kết quả phù hợp."
239
-
240
  asyncio.create_task(self.facebook.send_message(message=get_random_message(FOUND_REGULATIONS_MESSAGES)))
241
-
242
  #TODO: thời gian rerank kéo dài hơn 30s. Tạm thời bỏ qua bước reranking cho đến khi tìm ra phương án optimize
243
  # try:
244
  # reranked = await self.channel.reranker.rerank(question, matches, top_k=10)
245
  # if reranked: matches = reranked
246
  # except Exception as e:
247
  # logger.error(f"[RERANK] Lỗi khi rerank: {e}")
248
-
249
- full_result_text = ""
250
- def arr_to_str(arr, sep=", "):
251
- if not arr: return ""
252
- return sep.join([str(x) for x in arr if x not in (None, "")]) if isinstance(arr, list) else str(arr)
253
-
254
- for i, match in enumerate(matches, 1):
255
- full_result_text += f"\n\n* Nguồn: {(match.get('structure') or '').strip()}:\n"
256
- fullContent = (match.get('fullcontent') or '').strip()
257
- full_result_text += f"{fullContent}"
258
- hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ")
259
- if hpbsnoidung: full_result_text += f"\n- Hình phạt bổ sung: {hpbsnoidung}"
260
- bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ")
261
- if bpkpnoidung: full_result_text += f"\n- Biện pháp khắc phục: {bpkpnoidung}"
262
- if match.get('cr_impounding'): full_result_text += f"\n- Tạm giữ phương tiện: 07 ngày"
263
-
264
- prompt = (
265
- "Bạn một trợ lý pháp lý AI chuyên nghiệp. Nhiệm vụ của bạn là tổng hợp thông tin từ hai nguồn: **Lịch sử trò chuyện** và **Các đoạn luật liên quan** để đưa ra một câu trả lời duy nhất, liền mạch và tự nhiên cho người dùng.\n\n"
266
- "**QUY TẮC BẮT BUỘC:**\n"
267
- "1. **Hành văn tự nhiên:** Trả lời thẳng vào câu hỏi. **Không** bắt đầu bằng các cụm từ như 'Dựa trên thông tin được cung cấp', 'Theo các đoạn luật', v.v.\n"
268
- "2. **Nguồn trích dẫn:** Khi cần trích dẫn, chỉ nêu nguồn từ văn bản luật (ví dụ: 'theo Khoản 1, Điều 5...'). **Tuyệt đối không** trích dẫn nguồn là 'từ lịch sử trò chuyện'.\n"
269
- "3. **Tổng hợp thông tin:** Phải kết hợp thông tin từ cả hai nguồn một cách mượt mà. Ví dụ, nếu lịch sử trò chuyện đã có mức phạt cho xe máy, câu hỏi hiện tại là về xe máy điện, hãy sử dụng thông tin từ văn bản luật để xác định xe máy điện thuộc nhóm xe nào, sau đó áp dụng mức phạt đã biết từ lịch sử.\n"
270
- "4. **Ngắn gọn, chính xác:** Luôn trả lời ngắn gọn, rõ ràng và chỉ dựa vào thông tin được cung cấp.\n\n"
271
- f"### Lịch sử trò chuyện:\n{conversation_context}\n\n"
272
- f"### Các đoạn luật liên quan:\n{full_result_text}\n\n"
273
- f"### Câu hỏi của người dùng:\n{question}\n\n"
274
- "### Trả lời:"
275
- )
276
-
277
- asyncio.create_task(self.facebook.send_message(message=f"{get_random_message(SUMMARY_STATUS_MESSAGES)}"))
278
-
279
- try:
280
- answer = await self.channel.llm.generate_text(prompt)
281
- if answer and answer.strip():
282
- logger.info(f"LLM trả về câu trả lời: \n\tanswer: {answer}")
283
- return answer.strip()
284
- else:
285
- logger.error(f"LLM không trả về câu trả lời phù hợp: \n\tanswer: {answer}")
286
- except Exception as e:
287
- logger.error(f"LLM không sẵn sàng: {e}\n{traceback.format_exc()}")
288
-
289
- return "Dựa trên thông tin bạn cung cấp, tôi đã tìm thấy một số quy định liên quan. Tuy nhiên, tôi đang gặp chút khó khăn trong việc tóm tắt. Bạn vui lòng tham khảo nội dung chi tiết trong các văn bản luật nhé."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
290
 
291
  async def create_facebook_post(self, page_token: str, sender_id: str, history: List[Dict[str, Any]]) -> str:
292
  logger.info(f"[MOCK] Creating Facebook post for sender_id={sender_id} with history={history}")
 
3
  import traceback
4
  import json
5
  from loguru import logger
6
+ import random # random is used in the original file, but get_random_message is preferred
7
+ from .constants import START_SEARCHING_MESSAGES, SUMMARY_STATUS_MESSAGES, PROCESSING_STATUS_MESSAGES, FOUND_REGULATIONS_MESSAGES, BATCH_STATUS_MESSAGES, LLM_RETRY_WAIT_MESSAGES
8
+ from .utils import get_random_message, _safe_truncate
9
  from .facebook import FacebookClient
10
+ from .gemini_client import GeminiResponseError
11
  from app.config import get_settings
12
  import re
13
 
 
238
  async def format_search_results(self, conversation_context: str, question: str, matches: List[Dict[str, Any]], page_token: str, sender_id: str) -> str:
239
  if not matches:
240
  return "Không tìm thấy kết quả phù hợp."
241
+
242
  asyncio.create_task(self.facebook.send_message(message=get_random_message(FOUND_REGULATIONS_MESSAGES)))
243
+
244
  #TODO: thời gian rerank kéo dài hơn 30s. Tạm thời bỏ qua bước reranking cho đến khi tìm ra phương án optimize
245
  # try:
246
  # reranked = await self.channel.reranker.rerank(question, matches, top_k=10)
247
  # if reranked: matches = reranked
248
  # except Exception as e:
249
  # logger.error(f"[RERANK] Lỗi khi rerank: {e}")
250
+
251
+ # --- START: Logical Retry Loop for MAX_TOKENS/SAFETY ---
252
+ max_logical_retries = 3
253
+ original_matches = list(matches)
254
+
255
+ for attempt in range(max_logical_retries + 1):
256
+ current_matches = original_matches
257
+ if attempt > 0:
258
+ reduction_factor = 1.0 - (0.2 * attempt)
259
+ new_count = int(len(original_matches) * reduction_factor)
260
+ current_matches = original_matches[:new_count]
261
+ if not current_matches:
262
+ logger.error(f"[LLM_RETRY] No more documents to reduce. Failing.")
263
+ break
264
+ logger.warning(f"[LLM_RETRY] Attempt {attempt + 1}. Reducing documents to {len(current_matches)}.")
265
+
266
+ full_result_text = ""
267
+ def arr_to_str(arr, sep=", "):
268
+ if not arr: return ""
269
+ return sep.join([str(x) for x in arr if x not in (None, "")]) if isinstance(arr, list) else str(arr)
270
+
271
+ for i, match in enumerate(current_matches, 1):
272
+ full_result_text += f"\n\n* Nguồn: {(match.get('structure') or '').strip()}:\n"
273
+ fullContent = (match.get('fullcontent') or '').strip()
274
+ full_result_text += f"{fullContent}"
275
+ hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ")
276
+ if hpbsnoidung: full_result_text += f"\n- Hình phạt bổ sung: {hpbsnoidung}"
277
+ bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ")
278
+ if bpkpnoidung: full_result_text += f"\n- Biện pháp khắc phục: {bpkpnoidung}"
279
+ if match.get('cr_impounding'): full_result_text += f"\n- Tạm giữ phương tiện: 07 ngày"
280
+
281
+ prompt = (
282
+ "Bạn một trợ lý pháp lý AI chuyên nghiệp. Nhiệm vụ của bạn là tổng hợp thông tin từ hai nguồn: **Lịch sử trò chuyện** và **Các đoạn luật liên quan** để đưa ra một câu trả lời duy nhất, liền mạch và tự nhiên cho người dùng.\n\n"
283
+ "**QUY TẮC BẮT BUỘC:**\n"
284
+ "1. **Hành văn tự nhiên:** Trả lời thẳng vào câu hỏi. **Không** bắt đầu bằng các cụm từ như 'Dựa trên thông tin được cung cấp', 'Theo các đoạn luật', v.v.\n"
285
+ "2. **Nguồn trích dẫn:** Khi cần trích dẫn, chỉ nêu nguồn từ văn bản luật (ví dụ: 'theo Khoản 1, Điều 5...'). **Tuyệt đối không** trích dẫn nguồn là 'từ lịch sử trò chuyện'.\n"
286
+ "3. **Tổng hợp thông tin:** Phải kết hợp thông tin từ cả hai nguồn một cách mượt mà. Ví dụ, nếu lịch sử trò chuyện đã có mức phạt cho xe máy, và câu hỏi hiện tại là về xe máy điện, hãy sử dụng thông tin từ văn bản luật để xác định xe máy điện thuộc nhóm xe nào, sau đó áp dụng mức phạt đã biết từ lịch sử.\n"
287
+ "4. **Ngắn gọn, chính xác:** Luôn trả lời ngắn gọn, rõ ràng và chỉ dựa vào thông tin được cung cấp.\n\n"
288
+ f"### Lịch sử trò chuyện:\n{conversation_context}\n\n"
289
+ f"### Các đoạn luật liên quan:\n{full_result_text}\n\n"
290
+ f"### Câu hỏi của người dùng:\n{question}\n\n"
291
+ "### Trả lời:"
292
+ )
293
+
294
+ asyncio.create_task(self.facebook.send_message(message=f"{get_random_message(SUMMARY_STATUS_MESSAGES)}"))
295
+
296
+ try:
297
+ from google.generativeai.types import GenerationConfig
298
+ generation_config = GenerationConfig(max_output_tokens=2048, temperature=0.5)
299
+ answer = await self.channel.llm.generate_text(prompt, generation_config=generation_config)
300
+
301
+ if answer and answer.strip():
302
+ logger.info(f"LLM trả về câu trả lời thành công: \n\tanswer: {_safe_truncate(answer)}")
303
+ return answer.strip()
304
+ else:
305
+ logger.warning("LLM trả về câu trả lời hợp lệ nhưng rỗng. Sẽ trả về tin nhắn xin lỗi.")
306
+ break
307
+
308
+ except GeminiResponseError as e:
309
+ logger.error(f"[LLM_RETRY] Lỗi nội dung từ Gemini, sẽ thử lại với ít tài liệu hơn. Lý do: {e}")
310
+ if attempt < max_logical_retries:
311
+ asyncio.create_task(self.facebook.send_message(message=get_random_message(LLM_RETRY_WAIT_MESSAGES)))
312
+ continue
313
+ else:
314
+ logger.error(f"[LLM_RETRY] Đã hết số lần thử lại logic. Thất bại.")
315
+ break
316
+
317
+ except Exception as e:
318
+ logger.error(f"LLM không sẵn sàng sau tất cả các lần thử lại: {e}\n{traceback.format_exc()}")
319
+ break
320
+
321
+ # Fallback message if all attempts fail
322
+ logger.error("Tất cả các lần gọi LLM đều thất bại. Trả về tin nhắn xin lỗi cho người dùng.")
323
+ return "Xin lỗi bạn, tôi đang gặp một chút trục trặc kỹ thuật trong việc tổng hợp câu trả lời. Bạn có thể vui lòng đặt lại câu hỏi hoặc thử lại sau một lát được không ạ?"
324
 
325
  async def create_facebook_post(self, page_token: str, sender_id: str, history: List[Dict[str, Any]]) -> str:
326
  logger.info(f"[MOCK] Creating Facebook post for sender_id={sender_id} with history={history}")
app/supabase_db.py CHANGED
@@ -1,15 +1,24 @@
1
  from typing import Any, Dict, List, Optional
2
  from postgrest.types import CountMethod
3
- from supabase.client import create_client, Client
 
4
  from loguru import logger
5
  import re
6
- import time
7
  import httpx
 
8
 
9
  from .utils import timing_decorator_sync
10
  from .constants import VEHICLE_KEYWORD_TO_COLUMN, VIETNAMESE_STOP_WORDS, VIETNAMESE_STOP_PHRASES
11
  from .config import get_settings
12
 
 
 
 
 
 
 
 
 
13
  def remove_stop_phrases(text, stop_phrases):
14
  for phrase in stop_phrases:
15
  # Sửa: Không escape dấu cách trong phrase, chỉ escape các ký tự đặc biệt khác
@@ -25,11 +34,15 @@ class SupabaseClient:
25
  Input: url (str), key (str)
26
  Output: SupabaseClient instance.
27
  """
28
- self.client: Client = create_client(url, key)
 
 
 
29
  settings = get_settings()
30
  self.default_match_count = settings.match_count
31
 
32
  @timing_decorator_sync
 
33
  def get_page_token(self, page_id: str):
34
  """
35
  Lấy access token của Facebook page từ Supabase.
@@ -41,11 +54,15 @@ class SupabaseClient:
41
  if response.data and len(response.data) > 0:
42
  return response.data[0]['token']
43
  return None
44
- except Exception as e:
45
- logger.error(f"Error getting page token: {e}")
46
- return None
 
 
 
47
 
48
  @timing_decorator_sync
 
49
  def match_documents(self, embedding: List[float], match_count: Optional[int] = None, vehicle_keywords: Optional[List[str]] = None, user_question: str = '', keyword_threshold: float = 0.01, vector_threshold: float = 0.3, rrf_k: int = 60):
50
  """
51
  Truy vấn vector similarity search qua RPC match_documents.
@@ -86,30 +103,21 @@ class SupabaseClient:
86
  if vehicle_columns:
87
  payload['vehicle_filters'] = vehicle_columns
88
 
89
- max_retries = 3
90
- for attempt in range(max_retries):
91
- try:
92
- response = self.client.rpc(
93
- 'match_documents',
94
- payload
95
- ).execute()
96
-
97
- if response.data:
98
- return response.data
99
- return []
100
- except httpx.TimeoutException:
101
- logger.warning(f"Supabase RPC 'match_documents' timeout on attempt {attempt + 1}/{max_retries}. Retrying...")
102
- if attempt == max_retries - 1:
103
- logger.error(f"Supabase RPC failed after {max_retries} attempts due to timeout.")
104
- return []
105
- time.sleep(1 * (2 ** attempt)) # Exponential backoff: 1s, 2s, 4s
106
- except Exception as e:
107
- logger.error(f"Error matching documents: {e}")
108
- return []
109
-
110
- return [] # Fallback in case loop finishes without returning
111
 
112
  @timing_decorator_sync
 
113
  def store_embedding(self, text: str, embedding: List[float], metadata: Dict[str, Any]):
114
  """
115
  Lưu embedding vào Supabase.
@@ -124,11 +132,15 @@ class SupabaseClient:
124
  }).execute()
125
 
126
  return bool(response.data)
 
 
 
127
  except Exception as e:
128
- logger.error(f"Error storing embedding: {e}")
129
  return False
130
 
131
  @timing_decorator_sync
 
132
  def store_document_chunk(self, chunk_data: Dict[str, Any]) -> bool:
133
  """
134
  Lưu document chunk vào Supabase.
@@ -181,11 +193,15 @@ class SupabaseClient:
181
  logger.error(f"Failed to store chunk {processed_data.get('id', 'unknown')}")
182
  return False
183
 
 
 
 
184
  except Exception as e:
185
- logger.error(f"Error storing document chunk: {e}")
186
  return False
187
 
188
  @timing_decorator_sync
 
189
  def delete_all_document_chunks(self) -> bool:
190
  """
191
  Xóa toàn bộ bảng document_chunks.
@@ -196,11 +212,15 @@ class SupabaseClient:
196
  response = self.client.table('document_chunks').delete().execute()
197
  logger.info(f"Successfully deleted all document chunks")
198
  return True
 
 
 
199
  except Exception as e:
200
- logger.error(f"Error deleting all document chunks: {e}")
201
  return False
202
 
203
  @timing_decorator_sync
 
204
  def get_document_chunks_by_vanbanid(self, vanbanid: int) -> List[Dict[str, Any]]:
205
  """
206
  Lấy tất cả chunks của một văn bản theo vanbanid.
@@ -213,11 +233,15 @@ class SupabaseClient:
213
  logger.info(f"Found {len(response.data)} chunks for vanbanid {vanbanid}")
214
  return response.data
215
  return []
 
 
 
216
  except Exception as e:
217
- logger.error(f"Error getting document chunks for vanbanid {vanbanid}: {e}")
218
  return []
219
 
220
  @timing_decorator_sync
 
221
  def delete_document_chunks_by_vanbanid(self, vanbanid: int) -> bool:
222
  """
223
  Xóa tất cả chunks của một văn bản theo vanbanid.
@@ -228,11 +252,15 @@ class SupabaseClient:
228
  response = self.client.table('document_chunks').delete().eq('vanbanid', vanbanid).execute()
229
  logger.info(f"Successfully deleted all chunks for vanbanid {vanbanid}")
230
  return True
 
 
 
231
  except Exception as e:
232
- logger.error(f"Error deleting chunks for vanbanid {vanbanid}: {e}")
233
  return False
234
 
235
  @timing_decorator_sync
 
236
  def get_all_document_chunks(self) -> List[Dict[str, Any]]:
237
  """
238
  Lấy toàn bộ dữ liệu từ bảng document_chunks.
@@ -283,6 +311,9 @@ class SupabaseClient:
283
  logger.info(f"[SUPABASE] Fetched {page_count} pages with page_size={page_size}")
284
  return all_chunks
285
 
 
 
 
286
  except Exception as e:
287
- logger.error(f"[SUPABASE] Error fetching document chunks: {e}")
288
  return []
 
1
  from typing import Any, Dict, List, Optional
2
  from postgrest.types import CountMethod
3
+ from supabase.client import create_client, Client, ClientOptions
4
+ from postgrest.exceptions import APIError
5
  from loguru import logger
6
  import re
 
7
  import httpx
8
+ from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
9
 
10
  from .utils import timing_decorator_sync
11
  from .constants import VEHICLE_KEYWORD_TO_COLUMN, VIETNAMESE_STOP_WORDS, VIETNAMESE_STOP_PHRASES
12
  from .config import get_settings
13
 
14
+ # --- Cơ chế retry mạnh mẽ và có thể tái sử dụng ---
15
+ retry_on_supabase_error = retry(
16
+ stop=stop_after_attempt(4), # 1 lần gọi gốc + 3 lần thử lại
17
+ wait=wait_exponential(multiplier=5, min=10, max=60), # Chờ 10s, 20s, 40s
18
+ retry=retry_if_exception_type((httpx.HTTPError, APIError)),
19
+ before_sleep=lambda retry_state: logger.warning(f"Supabase call failed, retrying... Attempt: {retry_state.attempt_number}, Error: {retry_state.outcome.exception()}")
20
+ )
21
+
22
  def remove_stop_phrases(text, stop_phrases):
23
  for phrase in stop_phrases:
24
  # Sửa: Không escape dấu cách trong phrase, chỉ escape các ký tự đặc biệt khác
 
34
  Input: url (str), key (str)
35
  Output: SupabaseClient instance.
36
  """
37
+ # Tăng thời gian timeout mặc định của client để xử các truy vấn nặng
38
+ opts = ClientOptions(postgrest_client_timeout=60.0)
39
+ self.client: Client = create_client(url, key, options=opts)
40
+
41
  settings = get_settings()
42
  self.default_match_count = settings.match_count
43
 
44
  @timing_decorator_sync
45
+ @retry_on_supabase_error
46
  def get_page_token(self, page_id: str):
47
  """
48
  Lấy access token của Facebook page từ Supabase.
 
54
  if response.data and len(response.data) > 0:
55
  return response.data[0]['token']
56
  return None
57
+ except (httpx.HTTPError, APIError) as e:
58
+ logger.error(f"Error getting page token after retries: {e}")
59
+ raise # Ném lại lỗi để tenacity có thể bắt và retry
60
+ except Exception as e: # Bắt các lỗi không mong muốn khác
61
+ logger.exception(f"An unexpected error occurred while getting page token: {e}")
62
+ return None # Không retry với lỗi không mong muốn
63
 
64
  @timing_decorator_sync
65
+ @retry_on_supabase_error
66
  def match_documents(self, embedding: List[float], match_count: Optional[int] = None, vehicle_keywords: Optional[List[str]] = None, user_question: str = '', keyword_threshold: float = 0.01, vector_threshold: float = 0.3, rrf_k: int = 60):
67
  """
68
  Truy vấn vector similarity search qua RPC match_documents.
 
103
  if vehicle_columns:
104
  payload['vehicle_filters'] = vehicle_columns
105
 
106
+ try:
107
+ response = self.client.rpc(
108
+ 'match_documents',
109
+ payload
110
+ ).execute()
111
+ return response.data or []
112
+ except (httpx.HTTPError, APIError) as e:
113
+ logger.error(f"Error matching documents after retries: {e}")
114
+ raise
115
+ except Exception as e:
116
+ logger.exception(f"An unexpected error occurred in match_documents: {e}")
117
+ return []
 
 
 
 
 
 
 
 
 
 
118
 
119
  @timing_decorator_sync
120
+ @retry_on_supabase_error
121
  def store_embedding(self, text: str, embedding: List[float], metadata: Dict[str, Any]):
122
  """
123
  Lưu embedding vào Supabase.
 
132
  }).execute()
133
 
134
  return bool(response.data)
135
+ except (httpx.HTTPError, APIError) as e:
136
+ logger.error(f"Error storing embedding after retries: {e}")
137
+ raise
138
  except Exception as e:
139
+ logger.exception(f"An unexpected error occurred while storing embedding: {e}")
140
  return False
141
 
142
  @timing_decorator_sync
143
+ @retry_on_supabase_error
144
  def store_document_chunk(self, chunk_data: Dict[str, Any]) -> bool:
145
  """
146
  Lưu document chunk vào Supabase.
 
193
  logger.error(f"Failed to store chunk {processed_data.get('id', 'unknown')}")
194
  return False
195
 
196
+ except (httpx.HTTPError, APIError) as e:
197
+ logger.error(f"Error storing document chunk after retries: {e}")
198
+ raise
199
  except Exception as e:
200
+ logger.exception(f"An unexpected error occurred while storing document chunk: {e}")
201
  return False
202
 
203
  @timing_decorator_sync
204
+ @retry_on_supabase_error
205
  def delete_all_document_chunks(self) -> bool:
206
  """
207
  Xóa toàn bộ bảng document_chunks.
 
212
  response = self.client.table('document_chunks').delete().execute()
213
  logger.info(f"Successfully deleted all document chunks")
214
  return True
215
+ except (httpx.HTTPError, APIError) as e:
216
+ logger.error(f"Error deleting all document chunks after retries: {e}")
217
+ raise
218
  except Exception as e:
219
+ logger.exception(f"An unexpected error occurred while deleting all document chunks: {e}")
220
  return False
221
 
222
  @timing_decorator_sync
223
+ @retry_on_supabase_error
224
  def get_document_chunks_by_vanbanid(self, vanbanid: int) -> List[Dict[str, Any]]:
225
  """
226
  Lấy tất cả chunks của một văn bản theo vanbanid.
 
233
  logger.info(f"Found {len(response.data)} chunks for vanbanid {vanbanid}")
234
  return response.data
235
  return []
236
+ except (httpx.HTTPError, APIError) as e:
237
+ logger.error(f"Error getting document chunks for vanbanid {vanbanid} after retries: {e}")
238
+ raise
239
  except Exception as e:
240
+ logger.exception(f"An unexpected error occurred while getting document chunks for vanbanid {vanbanid}: {e}")
241
  return []
242
 
243
  @timing_decorator_sync
244
+ @retry_on_supabase_error
245
  def delete_document_chunks_by_vanbanid(self, vanbanid: int) -> bool:
246
  """
247
  Xóa tất cả chunks của một văn bản theo vanbanid.
 
252
  response = self.client.table('document_chunks').delete().eq('vanbanid', vanbanid).execute()
253
  logger.info(f"Successfully deleted all chunks for vanbanid {vanbanid}")
254
  return True
255
+ except (httpx.HTTPError, APIError) as e:
256
+ logger.error(f"Error deleting chunks for vanbanid {vanbanid} after retries: {e}")
257
+ raise
258
  except Exception as e:
259
+ logger.exception(f"An unexpected error occurred while deleting chunks for vanbanid {vanbanid}: {e}")
260
  return False
261
 
262
  @timing_decorator_sync
263
+ @retry_on_supabase_error
264
  def get_all_document_chunks(self) -> List[Dict[str, Any]]:
265
  """
266
  Lấy toàn bộ dữ liệu từ bảng document_chunks.
 
311
  logger.info(f"[SUPABASE] Fetched {page_count} pages with page_size={page_size}")
312
  return all_chunks
313
 
314
+ except (httpx.HTTPError, APIError) as e:
315
+ logger.error(f"[SUPABASE] Error fetching document chunks after retries: {e}")
316
+ raise
317
  except Exception as e:
318
+ logger.exception(f"An unexpected error occurred while fetching document chunks: {e}")
319
  return []