VietCat commited on
Commit
a9dc0f3
·
1 Parent(s): 3705270

update query and rerank in parallel

Browse files
Files changed (4) hide show
  1. ENVIRONMENT_VARIABLES.md +7 -0
  2. app/config.py +25 -10
  3. app/message_processor.py +344 -166
  4. app/reranker.py +18 -14
ENVIRONMENT_VARIABLES.md CHANGED
@@ -14,6 +14,12 @@
14
  - **Usage**: `MAX_DOCS_TO_RERANK=20`
15
  - **Impact**: Ảnh hưởng đến số lượng docs được xử lý trong reranking
16
 
 
 
 
 
 
 
17
  ## Logging Configuration
18
 
19
  ### LOG_LEVEL
@@ -137,6 +143,7 @@
137
  # Search Configuration
138
  MATCH_COUNT=15
139
  MAX_DOCS_TO_RERANK=15
 
140
 
141
  # Logging Configuration
142
  LOG_LEVEL=DEBUG
 
14
  - **Usage**: `MAX_DOCS_TO_RERANK=20`
15
  - **Impact**: Ảnh hưởng đến số lượng docs được xử lý trong reranking
16
 
17
+ ### RERANK_MIN_SCORE
18
+ - **Description**: Ngưỡng điểm tối thiểu để một document được giữ lại sau khi rerank.
19
+ - **Default**: `7.0`
20
+ - **Usage**: `RERANK_MIN_SCORE=7.5`
21
+ - **Impact**: Ảnh hưởng đến số lượng và chất lượng kết quả cuối cùng. Điểm cao hơn sẽ trả về ít kết quả hơn nhưng liên quan hơn.
22
+
23
  ## Logging Configuration
24
 
25
  ### LOG_LEVEL
 
143
  # Search Configuration
144
  MATCH_COUNT=15
145
  MAX_DOCS_TO_RERANK=15
146
+ RERANK_MIN_SCORE=7.0
147
 
148
  # Logging Configuration
149
  LOG_LEVEL=DEBUG
app/config.py CHANGED
@@ -8,46 +8,60 @@ from loguru import logger
8
  load_dotenv()
9
 
10
  # Debug: Check environment variables
11
- logger.info(f"[CONFIG] FACEBOOK_APP_SECRET from env: {os.getenv('FACEBOOK_APP_SECRET', 'NOT_SET')[:5]}...")
12
- logger.info(f"[CONFIG] FACEBOOK_VERIFY_TOKEN from env: {os.getenv('FACEBOOK_VERIFY_TOKEN', 'NOT_SET')[:5]}...")
13
- logger.info(f"[CONFIG] SUPABASE_URL from env: {os.getenv('SUPABASE_URL', 'NOT_SET')[:5]}...")
 
 
 
 
 
 
 
14
 
15
  class Settings(BaseSettings):
16
  """
17
  Lưu trữ và quản lý cấu hình ứng dụng từ biến môi trường.
18
  Các thuộc tính: facebook_verify_token, facebook_app_secret, ...
19
  """
 
20
  # Facebook Configuration
21
  facebook_verify_token: str = os.getenv("FACEBOOK_VERIFY_TOKEN") or ""
22
  facebook_app_secret: str = os.getenv("FACEBOOK_APP_SECRET") or ""
23
  facebook_api_base_url: str = os.getenv("FACEBOOK_API_BASE_URL") or ""
24
-
25
  # Google Sheets Configuration
26
- google_sheets_credentials_file: str = os.getenv("GOOGLE_SHEETS_CREDENTIALS_FILE") or ""
 
 
27
  google_sheets_token_file: str = os.getenv("GOOGLE_SHEETS_TOKEN_FILE") or ""
28
  conversation_sheet_id: str = os.getenv("CONVERSATION_SHEET_ID") or ""
29
-
30
  # Supabase Configuration
31
  supabase_url: str = os.getenv("SUPABASE_URL") or ""
32
  supabase_key: str = os.getenv("SUPABASE_KEY") or ""
33
-
34
  # Server Configuration
35
  host: str = os.getenv("HOST", "0.0.0.0") or ""
36
  port: int = int(os.getenv("PORT", "8000")) or 8000
37
-
38
  # Logging Configuration
39
  log_level: str = os.getenv("LOG_LEVEL", "DEBUG") or "DEBUG"
40
 
41
  # Search Configuration
42
  match_count: int = int(os.getenv("MATCH_COUNT", "20")) or 20
43
  max_docs_to_rerank: int = int(os.getenv("MAX_DOCS_TO_RERANK", "20")) or 20
 
44
 
45
  # Gemini Configuration
46
  # Hỗ trợ nhiều API key và model cho Gemini
47
  # Định nghĩa biến môi trường: GEMINI_API_KEYS="key1,key2,..."; GEMINI_MODELS="model1,model2,..."
48
  gemini_api_keys: str = ""
49
  gemini_models: str = ""
50
- gemini_base_url: str = os.getenv("GEMINI_BASE_URL", "https://generativelanguage.googleapis.com/v1") or ""
 
 
 
51
 
52
  # LLM (chat/completion) provider/model
53
  llm_provider: str = os.getenv("LLM_PROVIDER", "gemini") or ""
@@ -70,6 +84,7 @@ class Settings(BaseSettings):
70
  class Config:
71
  env_file = ".env"
72
 
 
73
  @lru_cache()
74
  def get_settings() -> Settings:
75
  """
@@ -77,4 +92,4 @@ def get_settings() -> Settings:
77
  Input: None
78
  Output: Settings instance.
79
  """
80
- return Settings()
 
8
  load_dotenv()
9
 
10
  # Debug: Check environment variables
11
+ logger.info(
12
+ f"[CONFIG] FACEBOOK_APP_SECRET from env: {os.getenv('FACEBOOK_APP_SECRET', 'NOT_SET')[:5]}..."
13
+ )
14
+ logger.info(
15
+ f"[CONFIG] FACEBOOK_VERIFY_TOKEN from env: {os.getenv('FACEBOOK_VERIFY_TOKEN', 'NOT_SET')[:5]}..."
16
+ )
17
+ logger.info(
18
+ f"[CONFIG] SUPABASE_URL from env: {os.getenv('SUPABASE_URL', 'NOT_SET')[:5]}..."
19
+ )
20
+
21
 
22
  class Settings(BaseSettings):
23
  """
24
  Lưu trữ và quản lý cấu hình ứng dụng từ biến môi trường.
25
  Các thuộc tính: facebook_verify_token, facebook_app_secret, ...
26
  """
27
+
28
  # Facebook Configuration
29
  facebook_verify_token: str = os.getenv("FACEBOOK_VERIFY_TOKEN") or ""
30
  facebook_app_secret: str = os.getenv("FACEBOOK_APP_SECRET") or ""
31
  facebook_api_base_url: str = os.getenv("FACEBOOK_API_BASE_URL") or ""
32
+
33
  # Google Sheets Configuration
34
+ google_sheets_credentials_file: str = (
35
+ os.getenv("GOOGLE_SHEETS_CREDENTIALS_FILE") or ""
36
+ )
37
  google_sheets_token_file: str = os.getenv("GOOGLE_SHEETS_TOKEN_FILE") or ""
38
  conversation_sheet_id: str = os.getenv("CONVERSATION_SHEET_ID") or ""
39
+
40
  # Supabase Configuration
41
  supabase_url: str = os.getenv("SUPABASE_URL") or ""
42
  supabase_key: str = os.getenv("SUPABASE_KEY") or ""
43
+
44
  # Server Configuration
45
  host: str = os.getenv("HOST", "0.0.0.0") or ""
46
  port: int = int(os.getenv("PORT", "8000")) or 8000
47
+
48
  # Logging Configuration
49
  log_level: str = os.getenv("LOG_LEVEL", "DEBUG") or "DEBUG"
50
 
51
  # Search Configuration
52
  match_count: int = int(os.getenv("MATCH_COUNT", "20")) or 20
53
  max_docs_to_rerank: int = int(os.getenv("MAX_DOCS_TO_RERANK", "20")) or 20
54
+ rerank_min_score: float = float(os.getenv("RERANK_MIN_SCORE", "7.0"))
55
 
56
  # Gemini Configuration
57
  # Hỗ trợ nhiều API key và model cho Gemini
58
  # Định nghĩa biến môi trường: GEMINI_API_KEYS="key1,key2,..."; GEMINI_MODELS="model1,model2,..."
59
  gemini_api_keys: str = ""
60
  gemini_models: str = ""
61
+ gemini_base_url: str = (
62
+ os.getenv("GEMINI_BASE_URL", "https://generativelanguage.googleapis.com/v1")
63
+ or ""
64
+ )
65
 
66
  # LLM (chat/completion) provider/model
67
  llm_provider: str = os.getenv("LLM_PROVIDER", "gemini") or ""
 
84
  class Config:
85
  env_file = ".env"
86
 
87
+
88
  @lru_cache()
89
  def get_settings() -> Settings:
90
  """
 
92
  Input: None
93
  Output: Settings instance.
94
  """
95
+ return Settings()
app/message_processor.py CHANGED
@@ -3,14 +3,22 @@ import asyncio
3
  import traceback
4
  import json
5
  from loguru import logger
6
- import random # random is used in the original file, but get_random_message is preferred
7
- from .constants import START_SEARCHING_MESSAGES, SUMMARY_STATUS_MESSAGES, PROCESSING_STATUS_MESSAGES, FOUND_REGULATIONS_MESSAGES, BATCH_STATUS_MESSAGES, LLM_RETRY_WAIT_MESSAGES
 
 
 
 
 
 
 
8
  from .utils import get_random_message, _safe_truncate
9
  from .facebook import FacebookClient
10
  from .gemini_client import GeminiResponseError
11
  from app.config import get_settings
12
  import re
13
 
 
14
  class MessageProcessor:
15
  def __init__(self, channel, sender_id):
16
  self.channel = channel
@@ -20,7 +28,7 @@ class MessageProcessor:
20
  app_secret=get_settings().facebook_app_secret,
21
  page_id=channel.page_id,
22
  page_token=channel.get_page_token(),
23
- sender_id=sender_id
24
  )
25
 
26
  async def process_message(self, message_data: Dict[str, Any]):
@@ -30,16 +38,20 @@ class MessageProcessor:
30
  required_fields = ["sender_id", "page_id", "text", "timestamp"]
31
  for field in required_fields:
32
  if field not in message_data:
33
- logger.error(f"[ERROR] Missing field {field} in message_data: {message_data}")
 
 
34
  return
35
-
36
  loop = asyncio.get_event_loop()
37
  sender_id = message_data["sender_id"]
38
  page_id = message_data["page_id"]
39
  message_text = message_data["text"]
40
  timestamp = message_data["timestamp"]
41
- attachments = message_data.get('attachments', [])
42
- logger.bind(user_id=sender_id, page_id=page_id, message=message_text).info("Processing message")
 
 
43
 
44
  if not message_text and not attachments:
45
  logger.info(f"[DEBUG] Không có message_text và attachments, không xử lý...")
@@ -52,31 +64,46 @@ class MessageProcessor:
52
  logger.info(f"[DEBUG] history: ... {history[-3:]}")
53
 
54
  for row in history:
55
- sheet_timestamps = [str(ts) for ts in row.get('timestamp', [])]
56
  if str(timestamp) in sheet_timestamps:
57
- logger.warning(f"Webhook lặp lại cho sự kiện đã tồn tại (timestamp: {timestamp}). Bỏ qua.")
 
 
58
  return
59
 
60
  log_kwargs = {
61
- 'conversation_id': None, 'recipient_id': sender_id, 'page_id': page_id,
62
- 'originaltext': message_text, 'originalcommand': '', 'originalcontent': '',
63
- 'originalattachments': attachments, 'originalvehicle': '', 'originalaction': '',
64
- 'originalpurpose': '', 'originalquestion': '', 'systemresponse': '',
65
- 'timestamp': [timestamp], 'isdone': False
 
 
 
 
 
 
 
 
 
66
  }
67
 
68
  logger.info(f"[DEBUG] Message cơ bản: {log_kwargs}")
69
- conv = await loop.run_in_executor(None, lambda: sheets_client.log_conversation(**log_kwargs))
 
 
70
  if not conv:
71
  logger.error("Không thể tạo conversation mới!")
72
  return
73
  logger.info(f"[DEBUG] Message history sau lần ghi đầu: {conv}")
74
-
75
- conv['timestamp'] = self.flatten_timestamp(conv['timestamp'])
76
- if timestamp not in conv['timestamp']:
77
- conv['timestamp'].append(timestamp)
78
-
79
- conv_after_update1 = await loop.run_in_executor(None, lambda: sheets_client.log_conversation(**conv))
 
 
80
  if conv_after_update1:
81
  conv = conv_after_update1
82
 
@@ -84,9 +111,13 @@ class MessageProcessor:
84
  if not page_token:
85
  logger.error(f"No access token found for page {message_data['page_id']}")
86
  return
87
-
88
  try:
89
- asyncio.create_task(self.facebook.send_message(message=get_random_message(PROCESSING_STATUS_MESSAGES)))
 
 
 
 
90
  except Exception as e:
91
  if "expired" in str(e).lower():
92
  logger.warning("[FACEBOOK] Token expired, invalidate and refresh")
@@ -98,21 +129,28 @@ class MessageProcessor:
98
 
99
  from app.utils import extract_command, extract_keywords
100
  from app.constants import VEHICLE_KEYWORDS
 
101
  command, remaining_text = extract_command(message_text)
102
-
103
- llm_analysis = None # Khởi tạo là None
104
  try:
105
- llm_analysis = await self.channel.llm.analyze(message_text, self.get_llm_history(history))
 
 
106
  logger.info(f"[LLM][RAW] Kết quả trả về từ analyze: {llm_analysis}")
107
  except GeminiResponseError as e:
108
- logger.error(f"[LLM][ANALYZE] Lỗi nội dung (MAX_TOKENS/SAFETY) khi phân tích câu hỏi: {e}. Sẽ fallback về phương pháp cũ.")
 
 
109
  except Exception as e:
110
- logger.error(f"[LLM][ANALYZE] Lỗi không xác định khi phân tích câu hỏi: {e}. Sẽ fallback về phương pháp cũ.")
111
-
 
 
112
  muc_dich = None
113
- tu_khoa_list = [] # Sửa: đổi tên thành tu_khoa_list và khởi tạo là list rỗng
114
  cau_hoi = None
115
-
116
  # Sửa: Đơn giản hóa logic, vì LLM giờ luôn trả về 1 dict
117
  analysis_data = None
118
  if isinstance(llm_analysis, list) and llm_analysis:
@@ -122,49 +160,64 @@ class MessageProcessor:
122
 
123
  if analysis_data:
124
  # Lấy phương tiện và chuẩn hóa
125
- phuong_tien = self.normalize_vehicle_keyword(analysis_data.get('phuong_tien', ''))
 
 
126
  keywords = [phuong_tien] if phuong_tien else []
127
-
128
- muc_dich = analysis_data.get('muc_dich')
129
-
130
  # Lấy danh sách từ khóa, đảm bảo nó là list
131
- raw_tu_khoa = analysis_data.get('tu_khoa', [])
132
  if isinstance(raw_tu_khoa, list):
133
  tu_khoa_list = raw_tu_khoa
134
  elif isinstance(raw_tu_khoa, str) and raw_tu_khoa:
135
- tu_khoa_list = [raw_tu_khoa] # Chuyển string thành list 1 phần tử
136
-
137
- cau_hoi = analysis_data.get('cau_hoi')
138
  else:
139
  # Fallback logic cũ nếu LLM không phân tích được
140
  keywords = extract_keywords(message_text, VEHICLE_KEYWORDS)
141
  cau_hoi = message_text
142
- for kw in keywords: cau_hoi = cau_hoi.replace(kw, "")
 
143
  cau_hoi = cau_hoi.strip()
144
-
145
  # Sửa: Log danh sách từ khóa
146
- logger.info(f"[DEBUG] Phương tiện: {keywords} - Từ khóa pháp lý: {tu_khoa_list} - Mục đích: {muc_dich} - Câu hỏi: {cau_hoi}")
 
 
147
 
148
- conv.update({
149
- 'originalcommand': command, 'originalcontent': remaining_text, 'originalvehicle': ','.join(keywords),
150
- # Sửa lỗi: Dùng separator ';;;' để nối các cụm từ khóa,
151
- # tránh bị tách sai ở bước sau.
152
- 'originalaction': ';;;'.join(tu_khoa_list), 'originalpurpose': muc_dich, 'originalquestion': cau_hoi or ""
153
- })
 
 
 
 
 
 
154
 
155
- muc_dich_to_use = muc_dich or conv.get('originalpurpose')
156
  logger.info(f"[DEBUG] Định hướng mục đích xử lý: {muc_dich_to_use}")
157
  conversation_context = self.get_llm_history(history)
158
 
159
  # Gửi tin nhắn trước khi tiến hành tìm kiếm
160
- asyncio.create_task(self.facebook.send_message(message=get_random_message(START_SEARCHING_MESSAGES)))
 
 
 
 
161
  response = None
162
  handlers = {
163
  "hỏi về mức phạt": self.handle_muc_phat,
164
  "hỏi về quy tắc giao thông": self.handle_quy_tac,
165
  "hỏi về báo hiệu đường bộ": self.handle_bao_hieu,
166
  "hỏi về quy trình xử lý vi phạm giao thông": self.handle_quy_trinh,
167
- "thông tin cá nhân của AI": self.handle_ca_nhan
168
  }
169
 
170
  if not command:
@@ -172,90 +225,125 @@ class MessageProcessor:
172
  response = await handler(conv, conversation_context, page_token, sender_id)
173
  else:
174
  if command == "xong":
175
- post_url = await self.create_facebook_post(page_token, conv['recipient_id'], [conv])
176
- response = f"Bài viết đã được tạo thành công! Bạn có thể xem tại: {post_url}" if post_url else "Đã xảy ra lỗi khi tạo bài viết."
177
- conv['isdone'] = True
 
 
 
 
 
 
178
  else:
179
- response = "Vui lòng cung cấp thêm thông tin và gõ lệnh \\xong khi hoàn tất."
180
- conv['isdone'] = False
 
 
181
 
182
  asyncio.create_task(self.facebook.send_message(message=response))
183
-
184
- conv['systemresponse'] = response
185
-
186
  logger.info(f"Chuẩn bị ghi/cập nhật dữ liệu cuối cùng vào sheet: {conv}")
187
-
188
  loop.run_in_executor(None, lambda: sheets_client.log_conversation(**conv))
189
-
190
  return
191
-
192
- def get_latest_timestamp(self,ts_value):
193
- if isinstance(ts_value, (int, float)): return int(ts_value)
 
194
  if isinstance(ts_value, str):
195
- try: return int(json.loads(ts_value))
196
- except:
197
- try: return int(ts_value)
198
- except: return 0
 
 
 
199
  if isinstance(ts_value, list):
200
- if not ts_value: return 0
201
- return max([self.get_latest_timestamp(item) for item in ts_value]) if ts_value else 0
 
 
 
 
 
202
  return 0
203
-
204
  def get_llm_history(self, history: List[Dict[str, Any]]) -> str:
205
  """
206
  Định dạng lịch sử hội thoại thành một chuỗi văn bản duy nhất,
207
  bao gồm cả các từ khóa đã sử dụng để cung cấp ngữ cảnh cho LLM.
208
  """
209
- sorted_history = sorted(history, key=lambda row: self.get_latest_timestamp(row.get('timestamp', 0)))
210
-
 
 
211
  # Lấy 5 lượt hội thoại gần nhất để tránh context quá dài
212
  recent_history = sorted_history[-5:]
213
-
214
  context_lines = []
215
  for row in recent_history:
216
- user_text = row.get('originaltext', '').strip()
217
- assistant_text = row.get('systemresponse', '').strip()
218
- keywords_used = row.get('originalaction', '').strip()
219
 
220
  if user_text:
221
- context_lines.append(f"##Người dùng##: {user_text} (từ khóa đã dùng: {keywords_used})")
222
-
 
 
223
  if assistant_text:
224
  context_lines.append(f"##Trợ lý##: {assistant_text}")
225
-
226
  return "\n".join(context_lines)
227
 
228
  def flatten_timestamp(self, ts):
229
  flat = []
230
- if not isinstance(ts, list): ts = [ts]
 
231
  for t in ts:
232
- if isinstance(t, list): flat.extend(self.flatten_timestamp(t))
233
- else: flat.append(t)
 
 
234
  return flat
235
 
236
  def normalize_vehicle_keyword(self, keyword: str) -> str:
237
  from app.constants import VEHICLE_KEYWORDS
238
  import difflib
239
- if not keyword: return ""
240
- matches = difflib.get_close_matches(keyword.lower(), [k.lower() for k in VEHICLE_KEYWORDS], n=1, cutoff=0.6)
 
 
 
 
241
  if matches:
242
  for k in VEHICLE_KEYWORDS:
243
- if k.lower() == matches[0]: return k
 
244
  return keyword
245
-
246
- async def format_search_results(self, conversation_context: str, question: str, matches: List[Dict[str, Any]], page_token: str, sender_id: str) -> str:
 
 
 
 
 
 
 
247
  if not matches:
248
  return "Không tìm thấy kết quả phù hợp."
249
-
250
- asyncio.create_task(self.facebook.send_message(message=get_random_message(FOUND_REGULATIONS_MESSAGES)))
251
-
252
- #TODO: thời gian rerank kéo dài hơn 30s. Tạm thời bỏ qua bước reranking cho đến khi tìm ra phương án optimize
253
- try:
254
- reranked = await self.channel.reranker.rerank(question, matches, top_k=10)
255
- if reranked: matches = reranked
256
- except Exception as e:
257
- logger.error(f"[RERANK] Lỗi khi rerank: {e}")
258
-
259
  # --- START: Logical Retry Loop for MAX_TOKENS/SAFETY ---
260
  max_logical_retries = 3
261
  original_matches = list(matches)
@@ -269,23 +357,36 @@ class MessageProcessor:
269
  if not current_matches:
270
  logger.error(f"[LLM_RETRY] No more documents to reduce. Failing.")
271
  break
272
- logger.warning(f"[LLM_RETRY] Attempt {attempt + 1}. Reducing documents to {len(current_matches)}.")
 
 
273
 
274
  full_result_text = ""
 
275
  def arr_to_str(arr, sep=", "):
276
- if not arr: return ""
277
- return sep.join([str(x) for x in arr if x not in (None, "")]) if isinstance(arr, list) else str(arr)
 
 
 
 
 
278
 
279
  for i, match in enumerate(current_matches, 1):
280
- full_result_text += f"\n\n* Nguồn: {(match.get('structure') or '').strip()}:\n"
281
- fullContent = (match.get('fullcontent') or '').strip()
 
 
282
  full_result_text += f"{fullContent}"
283
- hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ")
284
- if hpbsnoidung: full_result_text += f"\n- Hình phạt bổ sung: {hpbsnoidung}"
285
- bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ")
286
- if bpkpnoidung: full_result_text += f"\n- Biện pháp khắc phục: {bpkpnoidung}"
287
- if match.get('cr_impounding'): full_result_text += f"\n- Tạm giữ phương tiện: 07 ngày"
288
-
 
 
 
289
  prompt = (
290
  "Bạn là một trợ lý pháp lý AI chuyên nghiệp. Nhiệm vụ của bạn là tổng hợp thông tin từ hai nguồn: **Lịch sử trò chuyện** và **Các đoạn luật liên quan** để đưa ra một câu trả lời duy nhất, liền mạch và tự nhiên cho người dùng.\n\n"
291
  "**QUY TẮC BẮT BUỘC:**\n"
@@ -298,135 +399,196 @@ class MessageProcessor:
298
  f"### Câu hỏi của người dùng:\n{question}\n\n"
299
  "### Trả lời:"
300
  )
301
-
302
- asyncio.create_task(self.facebook.send_message(message=f"{get_random_message(SUMMARY_STATUS_MESSAGES)}"))
303
-
 
 
 
 
304
  try:
305
  # Đã bỏ cấu hình tường minh để sử dụng cài đặt mặc định của thư viện Gemini.
306
  answer = await self.channel.llm.generate_text(prompt)
307
-
308
  if answer and answer.strip():
309
- logger.info(f"LLM trả về câu trả lời thành công: \n\tanswer: {_safe_truncate(answer)}")
 
 
310
  return answer.strip()
311
  else:
312
- logger.warning("LLM trả về câu trả lời hợp lệ nhưng rỗng. Sẽ trả về tin nhắn xin lỗi.")
 
 
313
  break
314
 
315
  except GeminiResponseError as e:
316
- logger.error(f"[LLM_RETRY] Lỗi nội dung từ Gemini, sẽ thử lại với ít tài liệu hơn. Lý do: {e}")
 
 
317
  if attempt < max_logical_retries:
318
- asyncio.create_task(self.facebook.send_message(message=get_random_message(LLM_RETRY_WAIT_MESSAGES)))
 
 
 
 
319
  continue
320
  else:
321
  logger.error(f"[LLM_RETRY] Đã hết số lần thử lại logic. Thất bại.")
322
  break
323
 
324
  except Exception as e:
325
- logger.error(f"LLM không sẵn sàng sau tất cả các lần thử lại: {e}\n{traceback.format_exc()}")
 
 
326
  break
327
 
328
  # Fallback message if all attempts fail
329
- logger.error("Tất cả các lần gọi LLM đều thất bại. Trả về tin nhắn xin lỗi cho người dùng.")
 
 
330
  return "Xin lỗi bạn, tôi đang gặp một chút trục trặc kỹ thuật trong việc tổng hợp câu trả lời. Bạn có thể vui lòng đặt lại câu hỏi hoặc thử lại sau một lát được không ạ?"
331
 
332
- async def create_facebook_post(self, page_token: str, sender_id: str, history: List[Dict[str, Any]]) -> str:
333
- logger.info(f"[MOCK] Creating Facebook post for sender_id={sender_id} with history={history}")
334
- return "https://facebook.com/mock_post_url"
 
 
 
 
335
 
336
- async def _search_and_rerank_task(self, keyword: str, full_query_context: str, vehicle_keywords: List[str]) -> List[Dict[str, Any]]:
 
 
337
  """
338
  Hàm trợ giúp để thực hiện một tác vụ song song: query từ Supabase và sau đó rerank kết quả.
339
  LƯU Ý: Việc rerank cho mỗi luồng riêng lẻ có thể tốn kém và không hiệu quả về chất lượng kết quả cuối cùng.
340
  """
341
  try:
342
  logger.info(f"[SEARCH_RERANK_TASK] Bắt đầu tác vụ cho từ khóa: '{keyword}'")
343
-
344
  # 1. Query Supabase
345
  embedding = await self.channel.embedder.create_embedding(keyword)
346
  loop = asyncio.get_event_loop()
347
  match_count = get_settings().match_count
348
-
349
  matches = await loop.run_in_executor(
350
  None,
351
  lambda: self.channel.supabase.match_documents(
352
  embedding=embedding,
353
  match_count=match_count,
354
  user_question=keyword,
355
- vehicle_keywords=vehicle_keywords
356
- )
357
  )
358
-
359
  if not matches:
360
- logger.info(f"[SEARCH_RERANK_TASK] Không tìm thấy kết quả nào từ Supabase cho từ khóa: '{keyword}'")
 
 
361
  return []
362
 
363
- logger.info(f"[SEARCH_RERANK_TASK] Tìm thấy {len(matches)} kết quả. Bắt đầu rerank cho từ khóa: '{keyword}'")
 
 
364
 
365
  # 2. Rerank (Tạm thời bỏ qua theo logic code gốc, nhưng nếu bật sẽ chạy ở đây)
366
  # CẢNH BÁO: Bước này rất tốn kém và làm chậm hệ thống nếu chạy cho mỗi từ khóa.
367
  # Việc rerank nhiều lần sẽ làm tăng chi phí và có thể chạm giới hạn API.
368
- reranked_matches = matches # Mặc định trả về kết quả gốc nếu rerank bị lỗi hoặc tắt
 
 
369
  try:
370
  # Sử dụng full_query_context để rerank sẽ cho kết quả tốt hơn là chỉ dùng keyword
371
- reranked = await self.channel.reranker.rerank(keyword, matches, top_k=10)
 
 
 
372
  if reranked:
373
  reranked_matches = reranked
374
- logger.info(f"[SEARCH_RERANK_TASK] Rerank thành công cho từ khóa '{keyword}', còn lại {len(reranked_matches)} kết quả.")
 
 
375
  except Exception as e:
376
- logger.error(f"[SEARCH_RERANK_TASK] Lỗi khi rerank cho từ khóa '{keyword}': {e}. Sử dụng kết quả gốc.")
377
-
 
 
378
  return reranked_matches
379
  except Exception as e:
380
- logger.error(f"Lỗi trong tác vụ tìm kiếm và rerank cho từ khóa '{keyword}': {e}")
381
- return [] # Trả về danh sách rỗng để không làm hỏng luồng chung
 
 
382
 
383
  async def handle_muc_phat(self, conv, conversation_context, page_token, sender_id):
384
- vehicle_str = conv.get('originalvehicle', '')
385
- vehicle_keywords = vehicle_str.split(',') if vehicle_str else []
386
- action_keywords_str = conv.get('originalaction', '')
387
- question = conv.get('originalquestion', '')
388
 
389
  # Sửa lỗi: Tách các cụm từ khóa bằng separator ';;;' thay vì khoảng trắng.
390
  # Điều này đảm bảo mỗi từ khóa là một cụm từ hoàn chỉnh.
391
  # Lọc bỏ các chuỗi rỗng có thể xuất hiện nếu action_keywords_str rỗng.
392
- tu_khoa_list = [kw.strip() for kw in action_keywords_str.split(';;;') if kw.strip()]
 
 
393
 
394
  if not tu_khoa_list and not question:
395
- return "Để tra cứu mức phạt, bạn vui lòng cung cấp hành vi vi phạm nhé."
396
 
397
  main_query_for_context = question or action_keywords_str
398
 
399
  try:
400
  # --- 1. Tạo và chạy song song các tác vụ Query -> Rerank ---
401
  search_terms = tu_khoa_list if tu_khoa_list else [main_query_for_context]
402
- tasks = [self._search_and_rerank_task(term, main_query_for_context, vehicle_keywords) for term in search_terms]
 
 
 
 
 
 
 
 
 
 
403
  list_of_reranked_results = await asyncio.gather(*tasks)
404
-
405
  # --- 2. Tổng hợp và loại bỏ kết quả trùng lặp ---
406
  combined_matches = []
407
  seen_ids = set()
408
  for reranked_list in list_of_reranked_results:
409
  for match in reranked_list:
410
- match_id = match.get('doc_id')
411
  if match_id and match_id not in seen_ids:
412
  combined_matches.append(match)
413
  seen_ids.add(match_id)
414
-
415
- logger.info(f"Tổng hợp được {len(combined_matches)} văn bản duy nhất từ các tác vụ song song.")
 
 
416
 
417
  # --- 3. Tạo câu trả lời ---
418
  if combined_matches:
419
- response = await self.format_search_results(conversation_context, main_query_for_context, combined_matches, page_token, sender_id)
 
 
 
 
 
 
420
  else:
421
  response = "Xin lỗi, tôi không tìm thấy thông tin phù hợp với hành vi bạn mô tả."
422
  except Exception as e:
423
  logger.error(f"Lỗi khi tra cứu mức phạt: {e}\n{traceback.format_exc()}")
424
  response = "Đã có lỗi xảy ra trong quá trình tra cứu. Vui lòng thử lại sau."
425
-
426
- conv['isdone'] = True
427
  return response
428
 
429
- async def _handle_general_question(self, conversation_context: str, message_text: str, topic: str) -> str:
 
 
430
  prompt = (
431
  "Bạn là một trợ lý AI am hiểu về luật giao thông Việt Nam. "
432
  "Dựa vào lịch sử trò chuyện và kiến thức của bạn, hãy trả lời câu hỏi của người dùng một cách rõ ràng, ngắn gọn và chính xác.\n"
@@ -440,41 +602,57 @@ class MessageProcessor:
440
  if answer and answer.strip():
441
  return answer.strip()
442
  # If LLM returns an empty answer, provide a generic response.
443
- logger.warning(f"LLM returned an empty answer for general question on topic: {topic}")
 
 
444
  return "Cảm ơn bạn đã hỏi, nhưng tôi chưa có thông tin về vấn đề này. Bạn có thể hỏi câu khác được không?"
445
  except (GeminiResponseError, Exception) as e:
446
  logger.error(f"Error handling general question on topic '{topic}': {e}")
447
  return "Xin lỗi bạn, tôi đang gặp một chút trục trặc kỹ thuật và chưa thể trả lời câu hỏi này. Bạn vui lòng thử lại sau một lát nhé."
448
 
449
  async def handle_khac(self, conv, conversation_context, page_token, sender_id):
450
- conv['isdone'] = True
451
- return await self._handle_general_question(conversation_context, conv['originaltext'], "một vấn đề khác")
 
 
452
 
453
  async def handle_quy_tac(self, conv, conversation_context, page_token, sender_id):
454
- conv['isdone'] = True
455
- return await self.handle_muc_phat(conv, conversation_context, page_token, sender_id)
 
 
456
 
457
  async def handle_bao_hieu(self, conv, conversation_context, page_token, sender_id):
458
- conv['isdone'] = True
459
- return await self.handle_muc_phat(conv, conversation_context, page_token, sender_id)
 
 
460
 
461
  async def handle_quy_trinh(self, conv, conversation_context, page_token, sender_id):
462
- conv['isdone'] = True
463
- return await self.handle_muc_phat(conv, conversation_context, page_token, sender_id)
 
 
464
 
465
  async def handle_ca_nhan(self, conv, conversation_context, page_token, sender_id):
466
  prompt = (
467
  "Biết rằng bạn đã có lịch sử trao đổi như sau:"
468
  f"Lịch sử:\n{conversation_context}\n\n"
469
  'Với các thông tin sau: "Bạn có tên là WeThoong AI, là trợ lý giao thông thông minh. Bạn được anh Viet Cat tạo ra và facebook cá nhân của anh ấy là https://facebook.com/vietcat". '
470
- 'Không được trả lời bạn là AI của Google, OpenAI, hay bất kỳ hãng nào khác. '
471
- 'Hãy trả lời thông minh, hài hước, ngắn gọn cho câu hỏi sau:\n'
472
  f'Câu hỏi:\n"{conv["originaltext"]}"'
473
  )
474
  try:
475
  answer = await self.channel.llm.generate_text(prompt)
476
- conv['isdone'] = True
477
- return answer.strip() if answer and answer.strip() else "Chào bạn, mình là WeThoong AI đây!"
 
 
 
 
478
  except Exception as e:
479
  logger.error(f"Lỗi khi xử lý câu hỏi cá nhân: {e}")
480
- return "Chào bạn, mình là WeThoong AI, trợ lý giao thông thông minh của bạn!"
 
 
 
3
  import traceback
4
  import json
5
  from loguru import logger
6
+ import random # random is used in the original file, but get_random_message is preferred
7
+ from .constants import (
8
+ START_SEARCHING_MESSAGES,
9
+ SUMMARY_STATUS_MESSAGES,
10
+ PROCESSING_STATUS_MESSAGES,
11
+ FOUND_REGULATIONS_MESSAGES,
12
+ BATCH_STATUS_MESSAGES,
13
+ LLM_RETRY_WAIT_MESSAGES,
14
+ )
15
  from .utils import get_random_message, _safe_truncate
16
  from .facebook import FacebookClient
17
  from .gemini_client import GeminiResponseError
18
  from app.config import get_settings
19
  import re
20
 
21
+
22
  class MessageProcessor:
23
  def __init__(self, channel, sender_id):
24
  self.channel = channel
 
28
  app_secret=get_settings().facebook_app_secret,
29
  page_id=channel.page_id,
30
  page_token=channel.get_page_token(),
31
+ sender_id=sender_id,
32
  )
33
 
34
  async def process_message(self, message_data: Dict[str, Any]):
 
38
  required_fields = ["sender_id", "page_id", "text", "timestamp"]
39
  for field in required_fields:
40
  if field not in message_data:
41
+ logger.error(
42
+ f"[ERROR] Missing field {field} in message_data: {message_data}"
43
+ )
44
  return
45
+
46
  loop = asyncio.get_event_loop()
47
  sender_id = message_data["sender_id"]
48
  page_id = message_data["page_id"]
49
  message_text = message_data["text"]
50
  timestamp = message_data["timestamp"]
51
+ attachments = message_data.get("attachments", [])
52
+ logger.bind(user_id=sender_id, page_id=page_id, message=message_text).info(
53
+ "Processing message"
54
+ )
55
 
56
  if not message_text and not attachments:
57
  logger.info(f"[DEBUG] Không có message_text và attachments, không xử lý...")
 
64
  logger.info(f"[DEBUG] history: ... {history[-3:]}")
65
 
66
  for row in history:
67
+ sheet_timestamps = [str(ts) for ts in row.get("timestamp", [])]
68
  if str(timestamp) in sheet_timestamps:
69
+ logger.warning(
70
+ f"Webhook lặp lại cho sự kiện đã tồn tại (timestamp: {timestamp}). Bỏ qua."
71
+ )
72
  return
73
 
74
  log_kwargs = {
75
+ "conversation_id": None,
76
+ "recipient_id": sender_id,
77
+ "page_id": page_id,
78
+ "originaltext": message_text,
79
+ "originalcommand": "",
80
+ "originalcontent": "",
81
+ "originalattachments": attachments,
82
+ "originalvehicle": "",
83
+ "originalaction": "",
84
+ "originalpurpose": "",
85
+ "originalquestion": "",
86
+ "systemresponse": "",
87
+ "timestamp": [timestamp],
88
+ "isdone": False,
89
  }
90
 
91
  logger.info(f"[DEBUG] Message cơ bản: {log_kwargs}")
92
+ conv = await loop.run_in_executor(
93
+ None, lambda: sheets_client.log_conversation(**log_kwargs)
94
+ )
95
  if not conv:
96
  logger.error("Không thể tạo conversation mới!")
97
  return
98
  logger.info(f"[DEBUG] Message history sau lần ghi đầu: {conv}")
99
+
100
+ conv["timestamp"] = self.flatten_timestamp(conv["timestamp"])
101
+ if timestamp not in conv["timestamp"]:
102
+ conv["timestamp"].append(timestamp)
103
+
104
+ conv_after_update1 = await loop.run_in_executor(
105
+ None, lambda: sheets_client.log_conversation(**conv)
106
+ )
107
  if conv_after_update1:
108
  conv = conv_after_update1
109
 
 
111
  if not page_token:
112
  logger.error(f"No access token found for page {message_data['page_id']}")
113
  return
114
+
115
  try:
116
+ asyncio.create_task(
117
+ self.facebook.send_message(
118
+ message=get_random_message(PROCESSING_STATUS_MESSAGES)
119
+ )
120
+ )
121
  except Exception as e:
122
  if "expired" in str(e).lower():
123
  logger.warning("[FACEBOOK] Token expired, invalidate and refresh")
 
129
 
130
  from app.utils import extract_command, extract_keywords
131
  from app.constants import VEHICLE_KEYWORDS
132
+
133
  command, remaining_text = extract_command(message_text)
134
+
135
+ llm_analysis = None # Khởi tạo là None
136
  try:
137
+ llm_analysis = await self.channel.llm.analyze(
138
+ message_text, self.get_llm_history(history)
139
+ )
140
  logger.info(f"[LLM][RAW] Kết quả trả về từ analyze: {llm_analysis}")
141
  except GeminiResponseError as e:
142
+ logger.error(
143
+ f"[LLM][ANALYZE] Lỗi nội dung (MAX_TOKENS/SAFETY) khi phân tích câu hỏi: {e}. Sẽ fallback về phương pháp cũ."
144
+ )
145
  except Exception as e:
146
+ logger.error(
147
+ f"[LLM][ANALYZE] Lỗi không xác định khi phân tích câu hỏi: {e}. Sẽ fallback về phương pháp cũ."
148
+ )
149
+
150
  muc_dich = None
151
+ tu_khoa_list = [] # Sửa: đổi tên thành tu_khoa_list và khởi tạo là list rỗng
152
  cau_hoi = None
153
+
154
  # Sửa: Đơn giản hóa logic, vì LLM giờ luôn trả về 1 dict
155
  analysis_data = None
156
  if isinstance(llm_analysis, list) and llm_analysis:
 
160
 
161
  if analysis_data:
162
  # Lấy phương tiện và chuẩn hóa
163
+ phuong_tien = self.normalize_vehicle_keyword(
164
+ analysis_data.get("phuong_tien", "")
165
+ )
166
  keywords = [phuong_tien] if phuong_tien else []
167
+
168
+ muc_dich = analysis_data.get("muc_dich")
169
+
170
  # Lấy danh sách từ khóa, đảm bảo nó là list
171
+ raw_tu_khoa = analysis_data.get("tu_khoa", [])
172
  if isinstance(raw_tu_khoa, list):
173
  tu_khoa_list = raw_tu_khoa
174
  elif isinstance(raw_tu_khoa, str) and raw_tu_khoa:
175
+ tu_khoa_list = [raw_tu_khoa] # Chuyển string thành list 1 phần tử
176
+
177
+ cau_hoi = analysis_data.get("cau_hoi")
178
  else:
179
  # Fallback logic cũ nếu LLM không phân tích được
180
  keywords = extract_keywords(message_text, VEHICLE_KEYWORDS)
181
  cau_hoi = message_text
182
+ for kw in keywords:
183
+ cau_hoi = cau_hoi.replace(kw, "")
184
  cau_hoi = cau_hoi.strip()
185
+
186
  # Sửa: Log danh sách từ khóa
187
+ logger.info(
188
+ f"[DEBUG] Phương tiện: {keywords} - Từ khóa pháp lý: {tu_khoa_list} - Mục đích: {muc_dich} - Câu hỏi: {cau_hoi}"
189
+ )
190
 
191
+ conv.update(
192
+ {
193
+ "originalcommand": command,
194
+ "originalcontent": remaining_text,
195
+ "originalvehicle": ",".join(keywords),
196
+ # Sửa lỗi: Dùng separator ';;;' để nối các cụm từ khóa,
197
+ # tránh bị tách sai ở bước sau.
198
+ "originalaction": ";;;".join(tu_khoa_list),
199
+ "originalpurpose": muc_dich,
200
+ "originalquestion": cau_hoi or "",
201
+ }
202
+ )
203
 
204
+ muc_dich_to_use = muc_dich or conv.get("originalpurpose")
205
  logger.info(f"[DEBUG] Định hướng mục đích xử lý: {muc_dich_to_use}")
206
  conversation_context = self.get_llm_history(history)
207
 
208
  # Gửi tin nhắn trước khi tiến hành tìm kiếm
209
+ asyncio.create_task(
210
+ self.facebook.send_message(
211
+ message=get_random_message(START_SEARCHING_MESSAGES)
212
+ )
213
+ )
214
  response = None
215
  handlers = {
216
  "hỏi về mức phạt": self.handle_muc_phat,
217
  "hỏi về quy tắc giao thông": self.handle_quy_tac,
218
  "hỏi về báo hiệu đường bộ": self.handle_bao_hieu,
219
  "hỏi về quy trình xử lý vi phạm giao thông": self.handle_quy_trinh,
220
+ "thông tin cá nhân của AI": self.handle_ca_nhan,
221
  }
222
 
223
  if not command:
 
225
  response = await handler(conv, conversation_context, page_token, sender_id)
226
  else:
227
  if command == "xong":
228
+ post_url = await self.create_facebook_post(
229
+ page_token, conv["recipient_id"], [conv]
230
+ )
231
+ response = (
232
+ f"Bài viết đã được tạo thành công! Bạn có thể xem tại: {post_url}"
233
+ if post_url
234
+ else "Đã xảy ra lỗi khi tạo bài viết."
235
+ )
236
+ conv["isdone"] = True
237
  else:
238
+ response = (
239
+ "Vui lòng cung cấp thêm thông tin và gõ lệnh \\xong khi hoàn tất."
240
+ )
241
+ conv["isdone"] = False
242
 
243
  asyncio.create_task(self.facebook.send_message(message=response))
244
+
245
+ conv["systemresponse"] = response
246
+
247
  logger.info(f"Chuẩn bị ghi/cập nhật dữ liệu cuối cùng vào sheet: {conv}")
248
+
249
  loop.run_in_executor(None, lambda: sheets_client.log_conversation(**conv))
250
+
251
  return
252
+
253
+ def get_latest_timestamp(self, ts_value):
254
+ if isinstance(ts_value, (int, float)):
255
+ return int(ts_value)
256
  if isinstance(ts_value, str):
257
+ try:
258
+ return int(json.loads(ts_value))
259
+ except:
260
+ try:
261
+ return int(ts_value)
262
+ except:
263
+ return 0
264
  if isinstance(ts_value, list):
265
+ if not ts_value:
266
+ return 0
267
+ return (
268
+ max([self.get_latest_timestamp(item) for item in ts_value])
269
+ if ts_value
270
+ else 0
271
+ )
272
  return 0
273
+
274
  def get_llm_history(self, history: List[Dict[str, Any]]) -> str:
275
  """
276
  Định dạng lịch sử hội thoại thành một chuỗi văn bản duy nhất,
277
  bao gồm cả các từ khóa đã sử dụng để cung cấp ngữ cảnh cho LLM.
278
  """
279
+ sorted_history = sorted(
280
+ history, key=lambda row: self.get_latest_timestamp(row.get("timestamp", 0))
281
+ )
282
+
283
  # Lấy 5 lượt hội thoại gần nhất để tránh context quá dài
284
  recent_history = sorted_history[-5:]
285
+
286
  context_lines = []
287
  for row in recent_history:
288
+ user_text = row.get("originaltext", "").strip()
289
+ assistant_text = row.get("systemresponse", "").strip()
290
+ keywords_used = row.get("originalaction", "").strip()
291
 
292
  if user_text:
293
+ context_lines.append(
294
+ f"##Người dùng##: {user_text} (từ khóa đã dùng: {keywords_used})"
295
+ )
296
+
297
  if assistant_text:
298
  context_lines.append(f"##Trợ lý##: {assistant_text}")
299
+
300
  return "\n".join(context_lines)
301
 
302
  def flatten_timestamp(self, ts):
303
  flat = []
304
+ if not isinstance(ts, list):
305
+ ts = [ts]
306
  for t in ts:
307
+ if isinstance(t, list):
308
+ flat.extend(self.flatten_timestamp(t))
309
+ else:
310
+ flat.append(t)
311
  return flat
312
 
313
  def normalize_vehicle_keyword(self, keyword: str) -> str:
314
  from app.constants import VEHICLE_KEYWORDS
315
  import difflib
316
+
317
+ if not keyword:
318
+ return ""
319
+ matches = difflib.get_close_matches(
320
+ keyword.lower(), [k.lower() for k in VEHICLE_KEYWORDS], n=1, cutoff=0.6
321
+ )
322
  if matches:
323
  for k in VEHICLE_KEYWORDS:
324
+ if k.lower() == matches[0]:
325
+ return k
326
  return keyword
327
+
328
+ async def format_search_results(
329
+ self,
330
+ conversation_context: str,
331
+ question: str,
332
+ matches: List[Dict[str, Any]],
333
+ page_token: str,
334
+ sender_id: str,
335
+ ) -> str:
336
  if not matches:
337
  return "Không tìm thấy kết quả phù hợp."
338
+
339
+ # TODO: thời gian rerank kéo dài hơn 30s. Tạm thời bỏ qua bước reranking cho đến khi tìm ra phương án optimize
340
+ # try:
341
+ # settings = get_settings()
342
+ # reranked = await self.channel.reranker.rerank(question, matches, min_score=settings.rerank_min_score)
343
+ # if reranked: matches = reranked
344
+ # except Exception as e:
345
+ # logger.error(f"[RERANK] Lỗi khi rerank: {e}")
346
+
 
347
  # --- START: Logical Retry Loop for MAX_TOKENS/SAFETY ---
348
  max_logical_retries = 3
349
  original_matches = list(matches)
 
357
  if not current_matches:
358
  logger.error(f"[LLM_RETRY] No more documents to reduce. Failing.")
359
  break
360
+ logger.warning(
361
+ f"[LLM_RETRY] Attempt {attempt + 1}. Reducing documents to {len(current_matches)}."
362
+ )
363
 
364
  full_result_text = ""
365
+
366
  def arr_to_str(arr, sep=", "):
367
+ if not arr:
368
+ return ""
369
+ return (
370
+ sep.join([str(x) for x in arr if x not in (None, "")])
371
+ if isinstance(arr, list)
372
+ else str(arr)
373
+ )
374
 
375
  for i, match in enumerate(current_matches, 1):
376
+ full_result_text += (
377
+ f"\n\n* Nguồn: {(match.get('structure') or '').strip()}:\n"
378
+ )
379
+ fullContent = (match.get("fullcontent") or "").strip()
380
  full_result_text += f"{fullContent}"
381
+ hpbsnoidung = arr_to_str(match.get("hpbsnoidung"), sep="; ")
382
+ if hpbsnoidung:
383
+ full_result_text += f"\n- Hình phạt bổ sung: {hpbsnoidung}"
384
+ bpkpnoidung = arr_to_str(match.get("bpkpnoidung"), sep="; ")
385
+ if bpkpnoidung:
386
+ full_result_text += f"\n- Biện pháp khắc phục: {bpkpnoidung}"
387
+ if match.get("cr_impounding"):
388
+ full_result_text += f"\n- Tạm giữ phương tiện: 07 ngày"
389
+
390
  prompt = (
391
  "Bạn là một trợ lý pháp lý AI chuyên nghiệp. Nhiệm vụ của bạn là tổng hợp thông tin từ hai nguồn: **Lịch sử trò chuyện** và **Các đoạn luật liên quan** để đưa ra một câu trả lời duy nhất, liền mạch và tự nhiên cho người dùng.\n\n"
392
  "**QUY TẮC BẮT BUỘC:**\n"
 
399
  f"### Câu hỏi của người dùng:\n{question}\n\n"
400
  "### Trả lời:"
401
  )
402
+
403
+ asyncio.create_task(
404
+ self.facebook.send_message(
405
+ message=f"{get_random_message(SUMMARY_STATUS_MESSAGES)}"
406
+ )
407
+ )
408
+
409
  try:
410
  # Đã bỏ cấu hình tường minh để sử dụng cài đặt mặc định của thư viện Gemini.
411
  answer = await self.channel.llm.generate_text(prompt)
412
+
413
  if answer and answer.strip():
414
+ logger.info(
415
+ f"LLM trả về câu trả lời thành công: \n\tanswer: {_safe_truncate(answer)}"
416
+ )
417
  return answer.strip()
418
  else:
419
+ logger.warning(
420
+ "LLM trả về câu trả lời hợp lệ nhưng rỗng. Sẽ trả về tin nhắn xin lỗi."
421
+ )
422
  break
423
 
424
  except GeminiResponseError as e:
425
+ logger.error(
426
+ f"[LLM_RETRY] Lỗi nội dung từ Gemini, sẽ thử lại với ít tài liệu hơn. Lý do: {e}"
427
+ )
428
  if attempt < max_logical_retries:
429
+ asyncio.create_task(
430
+ self.facebook.send_message(
431
+ message=get_random_message(LLM_RETRY_WAIT_MESSAGES)
432
+ )
433
+ )
434
  continue
435
  else:
436
  logger.error(f"[LLM_RETRY] Đã hết số lần thử lại logic. Thất bại.")
437
  break
438
 
439
  except Exception as e:
440
+ logger.error(
441
+ f"LLM không sẵn sàng sau tất cả các lần thử lại: {e}\n{traceback.format_exc()}"
442
+ )
443
  break
444
 
445
  # Fallback message if all attempts fail
446
+ logger.error(
447
+ "Tất cả các lần gọi LLM đều thất bại. Trả về tin nhắn xin lỗi cho người dùng."
448
+ )
449
  return "Xin lỗi bạn, tôi đang gặp một chút trục trặc kỹ thuật trong việc tổng hợp câu trả lời. Bạn có thể vui lòng đặt lại câu hỏi hoặc thử lại sau một lát được không ạ?"
450
 
451
+ async def create_facebook_post(
452
+ self, page_token: str, sender_id: str, history: List[Dict[str, Any]]
453
+ ) -> str:
454
+ logger.info(
455
+ f"[MOCK] Creating Facebook post for sender_id={sender_id} with history={history}"
456
+ )
457
+ return "https://facebook.com/mock_post_url"
458
 
459
+ async def _search_and_rerank_task(
460
+ self, keyword: str, full_query_context: str, vehicle_keywords: List[str]
461
+ ) -> List[Dict[str, Any]]:
462
  """
463
  Hàm trợ giúp để thực hiện một tác vụ song song: query từ Supabase và sau đó rerank kết quả.
464
  LƯU Ý: Việc rerank cho mỗi luồng riêng lẻ có thể tốn kém và không hiệu quả về chất lượng kết quả cuối cùng.
465
  """
466
  try:
467
  logger.info(f"[SEARCH_RERANK_TASK] Bắt đầu tác vụ cho từ khóa: '{keyword}'")
468
+
469
  # 1. Query Supabase
470
  embedding = await self.channel.embedder.create_embedding(keyword)
471
  loop = asyncio.get_event_loop()
472
  match_count = get_settings().match_count
473
+
474
  matches = await loop.run_in_executor(
475
  None,
476
  lambda: self.channel.supabase.match_documents(
477
  embedding=embedding,
478
  match_count=match_count,
479
  user_question=keyword,
480
+ vehicle_keywords=vehicle_keywords,
481
+ ),
482
  )
483
+
484
  if not matches:
485
+ logger.info(
486
+ f"[SEARCH_RERANK_TASK] Không tìm thấy kết quả nào từ Supabase cho từ khóa: '{keyword}'"
487
+ )
488
  return []
489
 
490
+ logger.info(
491
+ f"[SEARCH_RERANK_TASK] Tìm thấy {len(matches)} kết quả. Bắt đầu rerank cho từ khóa: '{keyword}'"
492
+ )
493
 
494
  # 2. Rerank (Tạm thời bỏ qua theo logic code gốc, nhưng nếu bật sẽ chạy ở đây)
495
  # CẢNH BÁO: Bước này rất tốn kém và làm chậm hệ thống nếu chạy cho mỗi từ khóa.
496
  # Việc rerank nhiều lần sẽ làm tăng chi phí và có thể chạm giới hạn API.
497
+ reranked_matches = (
498
+ matches # Mặc định trả về kết quả gốc nếu rerank bị lỗi hoặc tắt
499
+ )
500
  try:
501
  # Sử dụng full_query_context để rerank sẽ cho kết quả tốt hơn là chỉ dùng keyword
502
+ settings = get_settings()
503
+ reranked = await self.channel.reranker.rerank(
504
+ keyword, matches, min_score=settings.rerank_min_score
505
+ )
506
  if reranked:
507
  reranked_matches = reranked
508
+ logger.info(
509
+ f"[SEARCH_RERANK_TASK] Rerank thành công cho từ khóa '{keyword}', còn lại {len(reranked_matches)} kết quả."
510
+ )
511
  except Exception as e:
512
+ logger.error(
513
+ f"[SEARCH_RERANK_TASK] Lỗi khi rerank cho từ khóa '{keyword}': {e}. Sử dụng kết quả gốc."
514
+ )
515
+
516
  return reranked_matches
517
  except Exception as e:
518
+ logger.error(
519
+ f"Lỗi trong tác vụ tìm kiếm rerank cho từ khóa '{keyword}': {e}"
520
+ )
521
+ return [] # Trả về danh sách rỗng để không làm hỏng luồng chung
522
 
523
  async def handle_muc_phat(self, conv, conversation_context, page_token, sender_id):
524
+ vehicle_str = conv.get("originalvehicle", "")
525
+ vehicle_keywords = vehicle_str.split(",") if vehicle_str else []
526
+ action_keywords_str = conv.get("originalaction", "")
527
+ question = conv.get("originalquestion", "")
528
 
529
  # Sửa lỗi: Tách các cụm từ khóa bằng separator ';;;' thay vì khoảng trắng.
530
  # Điều này đảm bảo mỗi từ khóa là một cụm từ hoàn chỉnh.
531
  # Lọc bỏ các chuỗi rỗng có thể xuất hiện nếu action_keywords_str rỗng.
532
+ tu_khoa_list = [
533
+ kw.strip() for kw in action_keywords_str.split(";;;") if kw.strip()
534
+ ]
535
 
536
  if not tu_khoa_list and not question:
537
+ return "Để tra cứu mức phạt, bạn vui lòng cung cấp hành vi vi phạm nhé."
538
 
539
  main_query_for_context = question or action_keywords_str
540
 
541
  try:
542
  # --- 1. Tạo và chạy song song các tác vụ Query -> Rerank ---
543
  search_terms = tu_khoa_list if tu_khoa_list else [main_query_for_context]
544
+ tasks = [
545
+ self._search_and_rerank_task(
546
+ term, main_query_for_context, vehicle_keywords
547
+ )
548
+ for term in search_terms
549
+ ]
550
+ asyncio.create_task(
551
+ self.facebook.send_message(
552
+ message=get_random_message(FOUND_REGULATIONS_MESSAGES)
553
+ )
554
+ )
555
  list_of_reranked_results = await asyncio.gather(*tasks)
556
+
557
  # --- 2. Tổng hợp và loại bỏ kết quả trùng lặp ---
558
  combined_matches = []
559
  seen_ids = set()
560
  for reranked_list in list_of_reranked_results:
561
  for match in reranked_list:
562
+ match_id = match.get("doc_id")
563
  if match_id and match_id not in seen_ids:
564
  combined_matches.append(match)
565
  seen_ids.add(match_id)
566
+
567
+ logger.info(
568
+ f"Tổng hợp được {len(combined_matches)} văn bản duy nhất từ các tác vụ song song."
569
+ )
570
 
571
  # --- 3. Tạo câu trả lời ---
572
  if combined_matches:
573
+ response = await self.format_search_results(
574
+ conversation_context,
575
+ main_query_for_context,
576
+ combined_matches,
577
+ page_token,
578
+ sender_id,
579
+ )
580
  else:
581
  response = "Xin lỗi, tôi không tìm thấy thông tin phù hợp với hành vi bạn mô tả."
582
  except Exception as e:
583
  logger.error(f"Lỗi khi tra cứu mức phạt: {e}\n{traceback.format_exc()}")
584
  response = "Đã có lỗi xảy ra trong quá trình tra cứu. Vui lòng thử lại sau."
585
+
586
+ conv["isdone"] = True
587
  return response
588
 
589
+ async def _handle_general_question(
590
+ self, conversation_context: str, message_text: str, topic: str
591
+ ) -> str:
592
  prompt = (
593
  "Bạn là một trợ lý AI am hiểu về luật giao thông Việt Nam. "
594
  "Dựa vào lịch sử trò chuyện và kiến thức của bạn, hãy trả lời câu hỏi của người dùng một cách rõ ràng, ngắn gọn và chính xác.\n"
 
602
  if answer and answer.strip():
603
  return answer.strip()
604
  # If LLM returns an empty answer, provide a generic response.
605
+ logger.warning(
606
+ f"LLM returned an empty answer for general question on topic: {topic}"
607
+ )
608
  return "Cảm ơn bạn đã hỏi, nhưng tôi chưa có thông tin về vấn đề này. Bạn có thể hỏi câu khác được không?"
609
  except (GeminiResponseError, Exception) as e:
610
  logger.error(f"Error handling general question on topic '{topic}': {e}")
611
  return "Xin lỗi bạn, tôi đang gặp một chút trục trặc kỹ thuật và chưa thể trả lời câu hỏi này. Bạn vui lòng thử lại sau một lát nhé."
612
 
613
  async def handle_khac(self, conv, conversation_context, page_token, sender_id):
614
+ conv["isdone"] = True
615
+ return await self._handle_general_question(
616
+ conversation_context, conv["originaltext"], "một vấn đề khác"
617
+ )
618
 
619
  async def handle_quy_tac(self, conv, conversation_context, page_token, sender_id):
620
+ conv["isdone"] = True
621
+ return await self.handle_muc_phat(
622
+ conv, conversation_context, page_token, sender_id
623
+ )
624
 
625
  async def handle_bao_hieu(self, conv, conversation_context, page_token, sender_id):
626
+ conv["isdone"] = True
627
+ return await self.handle_muc_phat(
628
+ conv, conversation_context, page_token, sender_id
629
+ )
630
 
631
  async def handle_quy_trinh(self, conv, conversation_context, page_token, sender_id):
632
+ conv["isdone"] = True
633
+ return await self.handle_muc_phat(
634
+ conv, conversation_context, page_token, sender_id
635
+ )
636
 
637
  async def handle_ca_nhan(self, conv, conversation_context, page_token, sender_id):
638
  prompt = (
639
  "Biết rằng bạn đã có lịch sử trao đổi như sau:"
640
  f"Lịch sử:\n{conversation_context}\n\n"
641
  'Với các thông tin sau: "Bạn có tên là WeThoong AI, là trợ lý giao thông thông minh. Bạn được anh Viet Cat tạo ra và facebook cá nhân của anh ấy là https://facebook.com/vietcat". '
642
+ "Không được trả lời bạn là AI của Google, OpenAI, hay bất kỳ hãng nào khác. "
643
+ "Hãy trả lời thông minh, hài hước, ngắn gọn cho câu hỏi sau:\n"
644
  f'Câu hỏi:\n"{conv["originaltext"]}"'
645
  )
646
  try:
647
  answer = await self.channel.llm.generate_text(prompt)
648
+ conv["isdone"] = True
649
+ return (
650
+ answer.strip()
651
+ if answer and answer.strip()
652
+ else "Chào bạn, mình là WeThoong AI đây!"
653
+ )
654
  except Exception as e:
655
  logger.error(f"Lỗi khi xử lý câu hỏi cá nhân: {e}")
656
+ return (
657
+ "Chào bạn, mình là WeThoong AI, trợ lý giao thông thông minh của bạn!"
658
+ )
app/reranker.py CHANGED
@@ -68,13 +68,15 @@ class Reranker:
68
 
69
  logger.info(f"[RERANK] Cleaned cache: removed {len(keys_to_remove)} old entries")
70
 
71
- def _get_cached_result(self, cache_key: str, top_k: int) -> List[Dict]:
72
  """Lấy kết quả từ cache nếu có và còn hợp lệ."""
73
  if cache_key in self._rerank_cache:
74
  current_time = time.time()
75
  if current_time - self._cache_timestamps.get(cache_key, 0) <= self._cache_ttl:
76
- cached_result = self._rerank_cache[cache_key][:top_k]
77
- logger.info(f"[RERANK] Cache hit for query, returning {len(cached_result)} cached results")
 
 
78
  return cached_result
79
  else:
80
  # Cache đã hết hạn, xóa
@@ -206,24 +208,24 @@ class Reranker:
206
  return doc
207
 
208
  @timing_decorator_async
209
- async def rerank(self, query: str, docs: List[Dict], top_k: int = 5) -> List[Dict]:
210
  """
211
- Rerank docs theo độ liên quan với query, trả về top_k docs.
212
  Sử dụng batch processing và caching để tối ưu hiệu suất.
213
  """
214
- logger.info(f"[RERANK] Start rerank for query: {query} | docs: {len(docs)} | top_k: {top_k}")
215
 
216
  if not docs:
217
  return []
218
 
219
  # Kiểm tra cache trước
220
  cache_key = self._get_cache_key(query, docs)
221
- cached_result = self._get_cached_result(cache_key, top_k)
222
 
223
  if cached_result:
224
  return cached_result
225
 
226
- # Giới hạn số lượng docs để rerank - chỉ rerank top 15 docs có similarity cao nhất
227
  max_docs_to_rerank = self.max_docs_to_rerank
228
  docs_to_rerank = docs[:max_docs_to_rerank]
229
  logger.info(f"[RERANK] Will rerank {len(docs_to_rerank)} docs (limited to top {max_docs_to_rerank})")
@@ -245,12 +247,14 @@ class Reranker:
245
  doc['rerank_score'] = 0
246
  scored.append(doc)
247
 
248
- # Sort theo score và trả về top_k
249
- scored = sorted(scored, key=lambda x: x['rerank_score'], reverse=True)
250
- result = scored[:top_k]
251
 
252
- # Cache kết quả với system mới
 
 
 
253
  self._set_cached_result(cache_key, scored)
254
 
255
- logger.info(f"[RERANK] Top reranked docs: {result[:2]}...{result[-2:]}")
256
- return result
 
68
 
69
  logger.info(f"[RERANK] Cleaned cache: removed {len(keys_to_remove)} old entries")
70
 
71
+ def _get_cached_result(self, cache_key: str, min_score: float) -> List[Dict]:
72
  """Lấy kết quả từ cache nếu có và còn hợp lệ."""
73
  if cache_key in self._rerank_cache:
74
  current_time = time.time()
75
  if current_time - self._cache_timestamps.get(cache_key, 0) <= self._cache_ttl:
76
+ # Lọc theo điểm thay vì lấy top_k
77
+ cached_docs = self._rerank_cache[cache_key]
78
+ cached_result = [doc for doc in cached_docs if doc.get('rerank_score', 0) >= min_score]
79
+ logger.info(f"[RERANK] Cache hit for query, returning {len(cached_result)} cached results with score >= {min_score}")
80
  return cached_result
81
  else:
82
  # Cache đã hết hạn, xóa
 
208
  return doc
209
 
210
  @timing_decorator_async
211
+ async def rerank(self, query: str, docs: List[Dict], min_score: float = 7.0) -> List[Dict]:
212
  """
213
+ Rerank docs theo độ liên quan với query, trả về các docs có điểm >= min_score.
214
  Sử dụng batch processing và caching để tối ưu hiệu suất.
215
  """
216
+ logger.info(f"[RERANK] Start rerank for query: {query} | docs: {len(docs)} | min_score: {min_score}")
217
 
218
  if not docs:
219
  return []
220
 
221
  # Kiểm tra cache trước
222
  cache_key = self._get_cache_key(query, docs)
223
+ cached_result = self._get_cached_result(cache_key, min_score)
224
 
225
  if cached_result:
226
  return cached_result
227
 
228
+ # Giới hạn số lượng docs để rerank - chỉ rerank top N docs có similarity cao nhất
229
  max_docs_to_rerank = self.max_docs_to_rerank
230
  docs_to_rerank = docs[:max_docs_to_rerank]
231
  logger.info(f"[RERANK] Will rerank {len(docs_to_rerank)} docs (limited to top {max_docs_to_rerank})")
 
247
  doc['rerank_score'] = 0
248
  scored.append(doc)
249
 
250
+ # Sort theo score
251
+ scored = sorted(scored, key=lambda x: x.get('rerank_score', 0), reverse=True)
 
252
 
253
+ # Lọc theo min_score
254
+ result = [doc for doc in scored if doc.get('rerank_score', 0) >= min_score]
255
+
256
+ # Cache kết quả đã được chấm điểm (toàn bộ, trước khi lọc)
257
  self._set_cached_result(cache_key, scored)
258
 
259
+ logger.info(f"[RERANK] Found {len(result)} docs with score >= {min_score}. Top results: {result[:2]}...{result[-2:] if len(result) > 2 else ''}")
260
+ return result