quick fix timestamp
Browse files- app/message_processor.py +111 -189
- app/sheets.py +107 -69
app/message_processor.py
CHANGED
|
@@ -1,7 +1,7 @@
|
|
| 1 |
from typing import Dict, Any, List, Optional
|
| 2 |
import asyncio
|
| 3 |
import traceback
|
| 4 |
-
import json
|
| 5 |
from loguru import logger
|
| 6 |
from .constants import SUMMARY_STATUS_MESSAGES, PROCESSING_STATUS_MESSAGES, FOUND_REGULATIONS_MESSAGES, BATCH_STATUS_MESSAGES
|
| 7 |
from .utils import get_random_message
|
|
@@ -22,8 +22,6 @@ class MessageProcessor:
|
|
| 22 |
)
|
| 23 |
|
| 24 |
async def process_message(self, message_data: Dict[str, Any]):
|
| 25 |
-
# Refactor logic từ main.py vào đây
|
| 26 |
-
# Lưu ý: self.channel.supabase, self.channel.llm, ...
|
| 27 |
if not message_data or not isinstance(message_data, dict):
|
| 28 |
logger.error(f"[ERROR] Invalid message_data: {message_data}")
|
| 29 |
return
|
|
@@ -39,12 +37,10 @@ class MessageProcessor:
|
|
| 39 |
attachments = message_data.get('attachments', [])
|
| 40 |
logger.bind(user_id=sender_id, page_id=page_id, message=message_text).info("Processing message")
|
| 41 |
|
| 42 |
-
# Nếu không có message_text và attachments, không xử lý
|
| 43 |
if not message_text and not attachments:
|
| 44 |
logger.info(f"[DEBUG] Không có message_text và attachments, không xử lý...")
|
| 45 |
return
|
| 46 |
|
| 47 |
-
# Lấy toàn bộ history (không lọc isdone)
|
| 48 |
loop = asyncio.get_event_loop()
|
| 49 |
sheets_client = self.channel.get_sheets_client()
|
| 50 |
history = await loop.run_in_executor(
|
|
@@ -52,13 +48,16 @@ class MessageProcessor:
|
|
| 52 |
)
|
| 53 |
logger.info(f"[DEBUG] history: {history}")
|
| 54 |
|
| 55 |
-
#
|
|
|
|
| 56 |
for row in history:
|
| 57 |
-
|
| 58 |
-
|
| 59 |
-
|
|
|
|
|
|
|
| 60 |
|
| 61 |
-
#
|
| 62 |
log_kwargs = {
|
| 63 |
'conversation_id': None,
|
| 64 |
'recipient_id': sender_id,
|
|
@@ -77,31 +76,27 @@ class MessageProcessor:
|
|
| 77 |
}
|
| 78 |
|
| 79 |
logger.info(f"[DEBUG] Message cơ bản: {log_kwargs}")
|
| 80 |
-
conv = None
|
| 81 |
-
|
| 82 |
conv = await loop.run_in_executor(None, lambda: sheets_client.log_conversation(**log_kwargs))
|
| 83 |
if not conv:
|
| 84 |
logger.error("Không thể tạo conversation mới!")
|
| 85 |
return
|
| 86 |
-
logger.info(f"[DEBUG] Message history: {conv}")
|
| 87 |
|
| 88 |
-
# Thêm timestamp mới nếu chưa có
|
| 89 |
conv['timestamp'] = self.flatten_timestamp(conv['timestamp'])
|
| 90 |
if timestamp not in conv['timestamp']:
|
| 91 |
conv['timestamp'].append(timestamp)
|
| 92 |
-
|
| 93 |
-
|
|
|
|
|
|
|
|
|
|
| 94 |
|
| 95 |
-
# Get page access token (cache)
|
| 96 |
page_token = self.channel.get_page_token()
|
| 97 |
-
|
| 98 |
-
if page_token:
|
| 99 |
-
logger.info(f"[DEBUG] page_token: {page_token[:10]} ... {page_token[-10:]}")
|
| 100 |
-
else:
|
| 101 |
-
logger.info(f"[DEBUG] page_token: None")
|
| 102 |
logger.error(f"No access token found for page {message_data['page_id']}")
|
| 103 |
return
|
| 104 |
-
|
| 105 |
try:
|
| 106 |
await self.facebook.send_message(message=get_random_message(PROCESSING_STATUS_MESSAGES))
|
| 107 |
except Exception as e:
|
|
@@ -109,19 +104,17 @@ class MessageProcessor:
|
|
| 109 |
logger.warning("[FACEBOOK] Token expired, invalidate and refresh")
|
| 110 |
self.channel.invalidate_page_token()
|
| 111 |
page_token = self.channel.get_page_token(force_refresh=True)
|
| 112 |
-
# Có thể update lại page_token cho self.facebook nếu cần
|
| 113 |
self.facebook.page_token = page_token
|
| 114 |
-
# await self.facebook.send_message(message="Ok, để mình check. Bạn chờ mình chút xíu nhé!")
|
| 115 |
else:
|
| 116 |
raise
|
| 117 |
|
| 118 |
-
# Extract command and keywords
|
| 119 |
from app.utils import extract_command, extract_keywords
|
| 120 |
from app.constants import VEHICLE_KEYWORDS
|
| 121 |
command, remaining_text = extract_command(message_text)
|
| 122 |
-
|
| 123 |
llm_analysis = await self.channel.llm.analyze(message_text)
|
| 124 |
logger.info(f"[LLM][RAW] Kết quả trả về từ analyze: {llm_analysis}")
|
|
|
|
| 125 |
muc_dich = None
|
| 126 |
hanh_vi = None
|
| 127 |
cau_hoi = None
|
|
@@ -141,82 +134,47 @@ class MessageProcessor:
|
|
| 141 |
for kw in keywords:
|
| 142 |
cau_hoi = cau_hoi.replace(kw, "")
|
| 143 |
cau_hoi = cau_hoi.strip()
|
|
|
|
| 144 |
logger.info(f"[DEBUG] Phương tiện: {keywords} - Hành vi: {hanh_vi} - Mục đích: {muc_dich} - Câu hỏi: {cau_hoi}")
|
| 145 |
-
# await self.channel.facebook.send_message(message=f"... đang tìm kiếm quy định liên quan đến {hanh_vi_vi_pham} .....")
|
| 146 |
-
# 4. Update lại conversation với thông tin đầy đủ
|
| 147 |
-
update_kwargs = {
|
| 148 |
-
'conversation_id': conv['conversation_id'],
|
| 149 |
-
'recipient_id': sender_id,
|
| 150 |
-
'page_id': page_id,
|
| 151 |
-
'originaltext': message_text,
|
| 152 |
-
'originalcommand': command,
|
| 153 |
-
'originalcontent': remaining_text,
|
| 154 |
-
'originalattachments': attachments,
|
| 155 |
-
'originalvehicle': ','.join(keywords),
|
| 156 |
-
'originalaction': hanh_vi,
|
| 157 |
-
'originalpurpose': muc_dich,
|
| 158 |
-
'originalquestion': cau_hoi or "",
|
| 159 |
-
'systemresponse': conv.get('systemresponse', ''),
|
| 160 |
-
'timestamp': self.flatten_timestamp(conv['timestamp']),
|
| 161 |
-
'isdone': False
|
| 162 |
-
}
|
| 163 |
-
for key, value in update_kwargs.items():
|
| 164 |
-
if value not in (None, "", []) and conv.get(key) in (None, "", []):
|
| 165 |
-
conv[key] = value
|
| 166 |
-
logger.info(f"[DEBUG] Message history update cuối cùng: {conv}")
|
| 167 |
|
| 168 |
-
#
|
| 169 |
-
|
| 170 |
-
|
| 171 |
-
|
| 172 |
-
|
| 173 |
-
|
| 174 |
-
|
|
|
|
|
|
|
| 175 |
logger.info(f"[DEBUG] Định hướng mục đích xử lý: {muc_dich_to_use}")
|
| 176 |
|
| 177 |
-
# Tin nhắn không có command: lấy toàn bộ history để truyền vào LLM
|
| 178 |
-
# Chuẩn bị context hội thoại cho LLM
|
| 179 |
MAX_CONTEXT_CHARS = 20_000
|
| 180 |
conversation_context = []
|
| 181 |
total_chars = 0
|
| 182 |
|
| 183 |
-
# <<< SỬA LỖI TẠI ĐÂY >>>
|
| 184 |
def get_latest_timestamp(ts_value):
|
| 185 |
-
if isinstance(ts_value, (int, float)):
|
| 186 |
-
return int(ts_value)
|
| 187 |
if isinstance(ts_value, str):
|
| 188 |
-
try:
|
| 189 |
-
|
| 190 |
-
|
| 191 |
-
|
| 192 |
-
return int(ts_value)
|
| 193 |
-
except (ValueError, TypeError):
|
| 194 |
-
return 0
|
| 195 |
if isinstance(ts_value, list):
|
| 196 |
if not ts_value: return 0
|
| 197 |
-
|
| 198 |
-
return max(all_timestamps) if all_timestamps else 0
|
| 199 |
return 0
|
|
|
|
| 200 |
sorted_history = sorted(history, key=lambda row: get_latest_timestamp(row.get('timestamp', 0)))
|
| 201 |
|
| 202 |
-
# Bước 2: Duyệt từ mới -> cũ để loại bỏ message cũ nếu cần
|
| 203 |
for row in reversed(sorted_history):
|
| 204 |
temp_blocks = []
|
| 205 |
-
if row.get('systemresponse'):
|
| 206 |
-
|
| 207 |
-
if row.get('originaltext'):
|
| 208 |
-
temp_blocks.append({"role": "user", "content": row['originaltext']})
|
| 209 |
-
|
| 210 |
temp_total = sum(len(block['content']) for block in temp_blocks)
|
| 211 |
-
|
| 212 |
-
if total_chars + temp_total > MAX_CONTEXT_CHARS:
|
| 213 |
-
continue # bỏ qua những block quá cũ
|
| 214 |
-
|
| 215 |
-
# prepend để đảm bảo thứ tự cuối cùng là từ cũ đến mới
|
| 216 |
conversation_context = temp_blocks + conversation_context
|
| 217 |
total_chars += temp_total
|
| 218 |
|
| 219 |
-
|
| 220 |
response = None
|
| 221 |
if not command:
|
| 222 |
if muc_dich_to_use == "hỏi về mức phạt":
|
|
@@ -232,7 +190,6 @@ class MessageProcessor:
|
|
| 232 |
else:
|
| 233 |
response = await self.handle_khac(conv, conversation_context, message_text)
|
| 234 |
else:
|
| 235 |
-
# Có command
|
| 236 |
if command == "xong":
|
| 237 |
post_url = await self.create_facebook_post(page_token, conv['recipient_id'], [conv])
|
| 238 |
if post_url:
|
|
@@ -245,8 +202,10 @@ class MessageProcessor:
|
|
| 245 |
conv['isdone'] = False
|
| 246 |
|
| 247 |
await self.facebook.send_message(message=response)
|
| 248 |
-
|
| 249 |
conv['systemresponse'] = response
|
|
|
|
|
|
|
| 250 |
await loop.run_in_executor(None, lambda: sheets_client.log_conversation(**conv))
|
| 251 |
return
|
| 252 |
|
|
@@ -283,45 +242,26 @@ class MessageProcessor:
|
|
| 283 |
matches = reranked
|
| 284 |
except Exception as e:
|
| 285 |
logger.error(f"[RERANK] Lỗi khi rerank: {e}")
|
| 286 |
-
|
| 287 |
-
top_result_text = ""
|
| 288 |
full_result_text = ""
|
| 289 |
def arr_to_str(arr, sep=", "):
|
| 290 |
-
if not arr:
|
| 291 |
-
|
| 292 |
-
|
| 293 |
-
return sep.join([str(x) for x in arr if x not in (None, "")])
|
| 294 |
-
return str(arr)
|
| 295 |
for i, match in enumerate(matches, 1):
|
| 296 |
-
|
| 297 |
-
top = match
|
| 298 |
-
full_result_text += f"\n{(match.get('structure') or '').strip()}:\n"
|
| 299 |
fullContent = (match.get('fullcontent') or '').strip()
|
| 300 |
full_result_text += f"{fullContent}"
|
| 301 |
hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ")
|
| 302 |
if hpbsnoidung:
|
| 303 |
-
full_result_text += f"\
|
| 304 |
bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ")
|
| 305 |
if bpkpnoidung:
|
| 306 |
-
full_result_text += f"\
|
| 307 |
impounding = match.get('impounding')
|
| 308 |
if impounding:
|
| 309 |
-
full_result_text += f"\
|
| 310 |
-
|
| 311 |
-
top_result_text += f"\n{(match.get('structure') or '').strip()}:\n"
|
| 312 |
-
fullContent = (match.get('fullcontent') or '').strip()
|
| 313 |
-
top_result_text += f"{fullContent}"
|
| 314 |
-
hpbsnoidung = arr_to_str(top.get('hpbsnoidung'), sep="; ")
|
| 315 |
-
if hpbsnoidung:
|
| 316 |
-
top_result_text += f"\nNgoài việc bị phạt tiền, người vi phạm còn bị: {hpbsnoidung}"
|
| 317 |
-
bpkpnoidung = arr_to_str(top.get('bpkpnoidung'), sep="; ")
|
| 318 |
-
if bpkpnoidung:
|
| 319 |
-
top_result_text += f"\nNgoài ra, người vi phạm còn bị buộc: {bpkpnoidung}"
|
| 320 |
-
impounding = top.get('impounding')
|
| 321 |
-
if impounding:
|
| 322 |
-
top_result_text += f"\nTạm giữ phương tiên: 07 ngày"
|
| 323 |
-
else:
|
| 324 |
-
result_text = "Không có kết quả phù hợp!"
|
| 325 |
prompt = (
|
| 326 |
"Bạn là một trợ lý pháp lý AI. Dưới đây là một số đoạn trích từ các văn bản pháp luật có liên quan.\n"
|
| 327 |
"Hãy sử dụng **duy nhất lịch sử trao đổi và thông tin trong các đoạn luật dưới đây** để trả lời câu hỏi bên dưới.\n"
|
|
@@ -329,13 +269,10 @@ class MessageProcessor:
|
|
| 329 |
"- Nếu không đủ thông tin để trả lời, hãy nói rõ.\n"
|
| 330 |
"- Trả lời ngắn gọn, rõ ràng, dễ hiểu.\n"
|
| 331 |
"- Nếu cần, hãy trích dẫn đoạn liên quan (ghi rõ số hiệu, điều khoản nếu có).\n"
|
| 332 |
-
f"### Lịch sử:\n"
|
| 333 |
-
f"{
|
| 334 |
-
"
|
| 335 |
-
|
| 336 |
-
"\n\n### Câu hỏi của người dùng:\n"
|
| 337 |
-
f"{question}"
|
| 338 |
-
"\n\n### Trả lời:"
|
| 339 |
)
|
| 340 |
await self.facebook.send_message(message=f"{get_random_message(SUMMARY_STATUS_MESSAGES)}")
|
| 341 |
try:
|
|
@@ -347,111 +284,96 @@ class MessageProcessor:
|
|
| 347 |
logger.error(f"LLM không trả về câu trả lời phù hợp: \n\tanswer: {answer}")
|
| 348 |
except Exception as e:
|
| 349 |
logger.error(f"LLM không sẵn sàng: {e}\n{traceback.format_exc()}")
|
| 350 |
-
|
| 351 |
-
|
| 352 |
-
|
| 353 |
-
fullContent = (match.get('fullcontent') or '').strip()
|
| 354 |
-
fallback += f"{fullContent}"
|
| 355 |
-
hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ")
|
| 356 |
-
if hpbsnoidung:
|
| 357 |
-
fallback += f" - Hình phạt bổ sung: {hpbsnoidung}\n"
|
| 358 |
-
bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ")
|
| 359 |
-
if bpkpnoidung:
|
| 360 |
-
fallback += f" - Biện pháp khắc phục hậu quả: {bpkpnoidung}\n"
|
| 361 |
-
impounding = match.get('impounding')
|
| 362 |
-
if impounding:
|
| 363 |
-
fallback += f"\nTạm giữ phương tiên: 07 ngày"
|
| 364 |
-
fallback += "\n"
|
| 365 |
-
return fallback.strip()
|
| 366 |
|
| 367 |
async def create_facebook_post(self, page_token: str, sender_id: str, history: List[Dict[str, Any]]) -> str:
|
| 368 |
logger.info(f"[MOCK] Creating Facebook post for sender_id={sender_id} with history={history}")
|
|
|
|
|
|
|
| 369 |
return "https://facebook.com/mock_post_url"
|
| 370 |
|
| 371 |
async def handle_muc_phat(self, conv, conversation_context, page_token, sender_id):
|
| 372 |
vehicle = conv.get('originalvehicle', '')
|
| 373 |
action = conv.get('originalaction', '')
|
| 374 |
question = conv.get('originalquestion', '')
|
| 375 |
-
|
| 376 |
-
if question:
|
| 377 |
-
|
| 378 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 379 |
logger.info(f"[DEBUG] embedding: {embedding[:5]} ... (total {len(embedding)})")
|
| 380 |
-
|
| 381 |
match_count = get_settings().match_count
|
| 382 |
matches = self.channel.supabase.match_documents(
|
| 383 |
embedding,
|
| 384 |
match_count=match_count,
|
| 385 |
-
user_question=
|
| 386 |
)
|
| 387 |
logger.info(f"[DEBUG] matches: {matches}")
|
| 388 |
if matches:
|
| 389 |
-
response = await self.format_search_results(conversation_context, question, matches, page_token, sender_id)
|
| 390 |
else:
|
| 391 |
-
response = "Xin lỗi, tôi không tìm thấy thông tin phù hợp
|
| 392 |
-
|
| 393 |
-
logger.
|
| 394 |
-
response = "
|
|
|
|
| 395 |
conv['isdone'] = True
|
| 396 |
return response
|
| 397 |
|
| 398 |
-
async def
|
|
|
|
| 399 |
prompt = (
|
| 400 |
-
"
|
| 401 |
-
|
| 402 |
-
"
|
| 403 |
-
"\n
|
| 404 |
-
f"
|
|
|
|
| 405 |
)
|
| 406 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 407 |
conv['isdone'] = True
|
| 408 |
-
return
|
|
|
|
|
|
|
|
|
|
|
|
|
| 409 |
|
| 410 |
async def handle_bao_hieu(self, conv, conversation_context, message_text):
|
| 411 |
-
prompt = (
|
| 412 |
-
"Biết rằng bạn đã có lịch sử trao đổi như sau:"
|
| 413 |
-
f"Lịch sử:\n{conversation_context}"
|
| 414 |
-
"Bạn là một trợ lý AI có kiến thức pháp luật, hãy trả lời câu hỏi dựa trên lịch sử trao đổi"
|
| 415 |
-
"\n\nHãy trả lời ngắn gọn, dễ hiểu, trích dẫn rõ ràng thông tin từ các đoạn luật nếu cần."
|
| 416 |
-
f"\n\nCâu hỏi của người dùng: {message_text}\n"
|
| 417 |
-
)
|
| 418 |
-
answer = await self.channel.llm.generate_text(message_text)
|
| 419 |
conv['isdone'] = True
|
| 420 |
-
return
|
| 421 |
|
| 422 |
async def handle_quy_trinh(self, conv, conversation_context, message_text):
|
| 423 |
-
prompt = (
|
| 424 |
-
"Biết rằng bạn đã có lịch sử trao đổi như sau:"
|
| 425 |
-
f"Lịch sử:\n{conversation_context}"
|
| 426 |
-
"Bạn là một trợ lý AI có kiến thức pháp luật, hãy trả lời câu hỏi dựa trên lịch sử trao đổi"
|
| 427 |
-
"\n\nHãy trả lời ngắn gọn, dễ hiểu, trích dẫn rõ ràng thông tin từ các đoạn luật nếu cần."
|
| 428 |
-
f"\n\nCâu hỏi của người dùng: {message_text}\n"
|
| 429 |
-
)
|
| 430 |
-
answer = await self.channel.llm.generate_text(message_text)
|
| 431 |
conv['isdone'] = True
|
| 432 |
-
return
|
| 433 |
|
| 434 |
async def handle_ca_nhan(self, conv, conversation_context, message_text):
|
| 435 |
prompt = (
|
| 436 |
"Biết rằng bạn đã có lịch sử trao đổi như sau:"
|
| 437 |
-
f"Lịch sử:\n{conversation_context}"
|
| 438 |
-
'Với các thông tin sau: "Bạn có tên là WeThoong AI, là trợ lý giao thông thông minh. Bạn được anh Viet Cat tạo ra và facebook cá nhân của anh
|
| 439 |
'Không được trả lời bạn là AI của Google, OpenAI, hay bất kỳ hãng nào khác. '
|
| 440 |
'Hãy trả lời thông minh, hài hước, ngắn gọn cho câu hỏi sau:\n'
|
| 441 |
f'Câu hỏi:\n"{message_text}"'
|
| 442 |
)
|
| 443 |
-
|
| 444 |
-
|
| 445 |
-
|
| 446 |
-
|
| 447 |
-
|
| 448 |
-
|
| 449 |
-
"
|
| 450 |
-
f"Lịch sử:\n{conversation_context}"
|
| 451 |
-
"Bạn là một trợ lý AI có kiến thức pháp luật, hãy trả lời câu hỏi dựa trên lịch sử trao đổi"
|
| 452 |
-
"\n\nHãy trả lời ngắn gọn, dễ hiểu, trích dẫn rõ ràng thông tin từ các đoạn luật nếu cần."
|
| 453 |
-
f"\n\nCâu hỏi của người dùng: {message_text}\n"
|
| 454 |
-
)
|
| 455 |
-
answer = await self.channel.llm.generate_text(message_text)
|
| 456 |
-
conv['isdone'] = True
|
| 457 |
-
return answer.strip() if answer and answer.strip() else "[Đang phát triển] Tính năng này sẽ sớm có mặt."
|
|
|
|
| 1 |
from typing import Dict, Any, List, Optional
|
| 2 |
import asyncio
|
| 3 |
import traceback
|
| 4 |
+
import json
|
| 5 |
from loguru import logger
|
| 6 |
from .constants import SUMMARY_STATUS_MESSAGES, PROCESSING_STATUS_MESSAGES, FOUND_REGULATIONS_MESSAGES, BATCH_STATUS_MESSAGES
|
| 7 |
from .utils import get_random_message
|
|
|
|
| 22 |
)
|
| 23 |
|
| 24 |
async def process_message(self, message_data: Dict[str, Any]):
|
|
|
|
|
|
|
| 25 |
if not message_data or not isinstance(message_data, dict):
|
| 26 |
logger.error(f"[ERROR] Invalid message_data: {message_data}")
|
| 27 |
return
|
|
|
|
| 37 |
attachments = message_data.get('attachments', [])
|
| 38 |
logger.bind(user_id=sender_id, page_id=page_id, message=message_text).info("Processing message")
|
| 39 |
|
|
|
|
| 40 |
if not message_text and not attachments:
|
| 41 |
logger.info(f"[DEBUG] Không có message_text và attachments, không xử lý...")
|
| 42 |
return
|
| 43 |
|
|
|
|
| 44 |
loop = asyncio.get_event_loop()
|
| 45 |
sheets_client = self.channel.get_sheets_client()
|
| 46 |
history = await loop.run_in_executor(
|
|
|
|
| 48 |
)
|
| 49 |
logger.info(f"[DEBUG] history: {history}")
|
| 50 |
|
| 51 |
+
# --- SỬA LỖI LOGIC CHỐNG TRÙNG LẶP TẠI ĐÂY ---
|
| 52 |
+
# Kiểm tra xem timestamp của sự kiện webhook này đã tồn tại trong lịch sử chưa
|
| 53 |
for row in history:
|
| 54 |
+
# Chuyển đổi an toàn sang string để so sánh
|
| 55 |
+
sheet_timestamps = [str(ts) for ts in row.get('timestamp', [])]
|
| 56 |
+
if str(timestamp) in sheet_timestamps:
|
| 57 |
+
logger.warning(f"Webhook lặp lại cho sự kiện đã tồn tại (timestamp: {timestamp}). Bỏ qua.")
|
| 58 |
+
return # Bỏ qua hoàn toàn để tránh xử lý lại
|
| 59 |
|
| 60 |
+
# --- LUỒNG XỬ LÝ GỐC CỦA BẠN ĐƯỢC GIỮ NGUYÊN ---
|
| 61 |
log_kwargs = {
|
| 62 |
'conversation_id': None,
|
| 63 |
'recipient_id': sender_id,
|
|
|
|
| 76 |
}
|
| 77 |
|
| 78 |
logger.info(f"[DEBUG] Message cơ bản: {log_kwargs}")
|
|
|
|
|
|
|
| 79 |
conv = await loop.run_in_executor(None, lambda: sheets_client.log_conversation(**log_kwargs))
|
| 80 |
if not conv:
|
| 81 |
logger.error("Không thể tạo conversation mới!")
|
| 82 |
return
|
| 83 |
+
logger.info(f"[DEBUG] Message history sau lần ghi đầu: {conv}")
|
| 84 |
|
| 85 |
+
# Thêm timestamp mới nếu chưa có (logic này có thể không cần thiết nữa nhưng giữ lại để không thay đổi luồng)
|
| 86 |
conv['timestamp'] = self.flatten_timestamp(conv['timestamp'])
|
| 87 |
if timestamp not in conv['timestamp']:
|
| 88 |
conv['timestamp'].append(timestamp)
|
| 89 |
+
|
| 90 |
+
# Lần gọi thứ 2 để cập nhật thêm thông tin ban đầu (nếu có)
|
| 91 |
+
conv_after_update1 = await loop.run_in_executor(None, lambda: sheets_client.log_conversation(**conv))
|
| 92 |
+
if conv_after_update1:
|
| 93 |
+
conv = conv_after_update1
|
| 94 |
|
|
|
|
| 95 |
page_token = self.channel.get_page_token()
|
| 96 |
+
if not page_token:
|
|
|
|
|
|
|
|
|
|
|
|
|
| 97 |
logger.error(f"No access token found for page {message_data['page_id']}")
|
| 98 |
return
|
| 99 |
+
|
| 100 |
try:
|
| 101 |
await self.facebook.send_message(message=get_random_message(PROCESSING_STATUS_MESSAGES))
|
| 102 |
except Exception as e:
|
|
|
|
| 104 |
logger.warning("[FACEBOOK] Token expired, invalidate and refresh")
|
| 105 |
self.channel.invalidate_page_token()
|
| 106 |
page_token = self.channel.get_page_token(force_refresh=True)
|
|
|
|
| 107 |
self.facebook.page_token = page_token
|
|
|
|
| 108 |
else:
|
| 109 |
raise
|
| 110 |
|
|
|
|
| 111 |
from app.utils import extract_command, extract_keywords
|
| 112 |
from app.constants import VEHICLE_KEYWORDS
|
| 113 |
command, remaining_text = extract_command(message_text)
|
| 114 |
+
|
| 115 |
llm_analysis = await self.channel.llm.analyze(message_text)
|
| 116 |
logger.info(f"[LLM][RAW] Kết quả trả về từ analyze: {llm_analysis}")
|
| 117 |
+
|
| 118 |
muc_dich = None
|
| 119 |
hanh_vi = None
|
| 120 |
cau_hoi = None
|
|
|
|
| 134 |
for kw in keywords:
|
| 135 |
cau_hoi = cau_hoi.replace(kw, "")
|
| 136 |
cau_hoi = cau_hoi.strip()
|
| 137 |
+
|
| 138 |
logger.info(f"[DEBUG] Phương tiện: {keywords} - Hành vi: {hanh_vi} - Mục đích: {muc_dich} - Câu hỏi: {cau_hoi}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 139 |
|
| 140 |
+
# Hợp nhất dữ liệu đã phân tích vào `conv`
|
| 141 |
+
conv['originalcommand'] = command
|
| 142 |
+
conv['originalcontent'] = remaining_text
|
| 143 |
+
conv['originalvehicle'] = ','.join(keywords)
|
| 144 |
+
conv['originalaction'] = hanh_vi
|
| 145 |
+
conv['originalpurpose'] = muc_dich
|
| 146 |
+
conv['originalquestion'] = cau_hoi or ""
|
| 147 |
+
|
| 148 |
+
muc_dich_to_use = muc_dich or conv.get('originalpurpose')
|
| 149 |
logger.info(f"[DEBUG] Định hướng mục đích xử lý: {muc_dich_to_use}")
|
| 150 |
|
|
|
|
|
|
|
| 151 |
MAX_CONTEXT_CHARS = 20_000
|
| 152 |
conversation_context = []
|
| 153 |
total_chars = 0
|
| 154 |
|
|
|
|
| 155 |
def get_latest_timestamp(ts_value):
|
| 156 |
+
if isinstance(ts_value, (int, float)): return int(ts_value)
|
|
|
|
| 157 |
if isinstance(ts_value, str):
|
| 158 |
+
try: return int(json.loads(ts_value))
|
| 159 |
+
except:
|
| 160 |
+
try: return int(ts_value)
|
| 161 |
+
except: return 0
|
|
|
|
|
|
|
|
|
|
| 162 |
if isinstance(ts_value, list):
|
| 163 |
if not ts_value: return 0
|
| 164 |
+
return max([get_latest_timestamp(item) for item in ts_value]) if ts_value else 0
|
|
|
|
| 165 |
return 0
|
| 166 |
+
|
| 167 |
sorted_history = sorted(history, key=lambda row: get_latest_timestamp(row.get('timestamp', 0)))
|
| 168 |
|
|
|
|
| 169 |
for row in reversed(sorted_history):
|
| 170 |
temp_blocks = []
|
| 171 |
+
if row.get('systemresponse'): temp_blocks.append({"role": "assistant", "content": row['systemresponse']})
|
| 172 |
+
if row.get('originaltext'): temp_blocks.append({"role": "user", "content": row['originaltext']})
|
|
|
|
|
|
|
|
|
|
| 173 |
temp_total = sum(len(block['content']) for block in temp_blocks)
|
| 174 |
+
if total_chars + temp_total > MAX_CONTEXT_CHARS: continue
|
|
|
|
|
|
|
|
|
|
|
|
|
| 175 |
conversation_context = temp_blocks + conversation_context
|
| 176 |
total_chars += temp_total
|
| 177 |
|
|
|
|
| 178 |
response = None
|
| 179 |
if not command:
|
| 180 |
if muc_dich_to_use == "hỏi về mức phạt":
|
|
|
|
| 190 |
else:
|
| 191 |
response = await self.handle_khac(conv, conversation_context, message_text)
|
| 192 |
else:
|
|
|
|
| 193 |
if command == "xong":
|
| 194 |
post_url = await self.create_facebook_post(page_token, conv['recipient_id'], [conv])
|
| 195 |
if post_url:
|
|
|
|
| 202 |
conv['isdone'] = False
|
| 203 |
|
| 204 |
await self.facebook.send_message(message=response)
|
| 205 |
+
|
| 206 |
conv['systemresponse'] = response
|
| 207 |
+
|
| 208 |
+
logger.info(f"Chuẩn bị ghi/cập nhật dữ liệu cuối cùng vào sheet: {conv}")
|
| 209 |
await loop.run_in_executor(None, lambda: sheets_client.log_conversation(**conv))
|
| 210 |
return
|
| 211 |
|
|
|
|
| 242 |
matches = reranked
|
| 243 |
except Exception as e:
|
| 244 |
logger.error(f"[RERANK] Lỗi khi rerank: {e}")
|
| 245 |
+
|
|
|
|
| 246 |
full_result_text = ""
|
| 247 |
def arr_to_str(arr, sep=", "):
|
| 248 |
+
if not arr: return ""
|
| 249 |
+
return sep.join([str(x) for x in arr if x not in (None, "")]) if isinstance(arr, list) else str(arr)
|
| 250 |
+
|
|
|
|
|
|
|
| 251 |
for i, match in enumerate(matches, 1):
|
| 252 |
+
full_result_text += f"\n- Nguồn: {(match.get('structure') or '').strip()}:\n"
|
|
|
|
|
|
|
| 253 |
fullContent = (match.get('fullcontent') or '').strip()
|
| 254 |
full_result_text += f"{fullContent}"
|
| 255 |
hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ")
|
| 256 |
if hpbsnoidung:
|
| 257 |
+
full_result_text += f"\n- Hình phạt bổ sung: {hpbsnoidung}"
|
| 258 |
bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ")
|
| 259 |
if bpkpnoidung:
|
| 260 |
+
full_result_text += f"\n- Biện pháp khắc phục: {bpkpnoidung}"
|
| 261 |
impounding = match.get('impounding')
|
| 262 |
if impounding:
|
| 263 |
+
full_result_text += f"\n- Có thể tạm giữ phương tiện."
|
| 264 |
+
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 265 |
prompt = (
|
| 266 |
"Bạn là một trợ lý pháp lý AI. Dưới đây là một số đoạn trích từ các văn bản pháp luật có liên quan.\n"
|
| 267 |
"Hãy sử dụng **duy nhất lịch sử trao đổi và thông tin trong các đoạn luật dưới đây** để trả lời câu hỏi bên dưới.\n"
|
|
|
|
| 269 |
"- Nếu không đủ thông tin để trả lời, hãy nói rõ.\n"
|
| 270 |
"- Trả lời ngắn gọn, rõ ràng, dễ hiểu.\n"
|
| 271 |
"- Nếu cần, hãy trích dẫn đoạn liên quan (ghi rõ số hiệu, điều khoản nếu có).\n"
|
| 272 |
+
f"### Lịch sử:\n{conversation_context}\n"
|
| 273 |
+
f"### Các đoạn luật liên quan:\n{full_result_text}\n\n"
|
| 274 |
+
f"### Câu hỏi của người dùng:\n{question}\n\n"
|
| 275 |
+
"### Trả lời:"
|
|
|
|
|
|
|
|
|
|
| 276 |
)
|
| 277 |
await self.facebook.send_message(message=f"{get_random_message(SUMMARY_STATUS_MESSAGES)}")
|
| 278 |
try:
|
|
|
|
| 284 |
logger.error(f"LLM không trả về câu trả lời phù hợp: \n\tanswer: {answer}")
|
| 285 |
except Exception as e:
|
| 286 |
logger.error(f"LLM không sẵn sàng: {e}\n{traceback.format_exc()}")
|
| 287 |
+
|
| 288 |
+
# Fallback response
|
| 289 |
+
return "Dựa trên thông tin bạn cung cấp, tôi đã tìm thấy một số quy định liên quan. Tuy nhiên, tôi đang gặp chút khó khăn trong việc tóm tắt. Bạn vui lòng tham khảo nội dung chi tiết trong các văn bản luật nhé."
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 290 |
|
| 291 |
async def create_facebook_post(self, page_token: str, sender_id: str, history: List[Dict[str, Any]]) -> str:
|
| 292 |
logger.info(f"[MOCK] Creating Facebook post for sender_id={sender_id} with history={history}")
|
| 293 |
+
# In a real scenario, you would use the Facebook Graph API to create a post.
|
| 294 |
+
# This is a mock implementation.
|
| 295 |
return "https://facebook.com/mock_post_url"
|
| 296 |
|
| 297 |
async def handle_muc_phat(self, conv, conversation_context, page_token, sender_id):
|
| 298 |
vehicle = conv.get('originalvehicle', '')
|
| 299 |
action = conv.get('originalaction', '')
|
| 300 |
question = conv.get('originalquestion', '')
|
| 301 |
+
|
| 302 |
+
if not action and not question:
|
| 303 |
+
return "Để tra cứu mức phạt, bạn vui lòng cung cấp hành vi vi phạm nhé."
|
| 304 |
+
|
| 305 |
+
search_query = action or question
|
| 306 |
+
logger.info(f"[DEBUG] tạo embedding cho: '{search_query}'")
|
| 307 |
+
try:
|
| 308 |
+
embedding = await self.channel.embedder.create_embedding(search_query)
|
| 309 |
logger.info(f"[DEBUG] embedding: {embedding[:5]} ... (total {len(embedding)})")
|
| 310 |
+
|
| 311 |
match_count = get_settings().match_count
|
| 312 |
matches = self.channel.supabase.match_documents(
|
| 313 |
embedding,
|
| 314 |
match_count=match_count,
|
| 315 |
+
user_question=search_query
|
| 316 |
)
|
| 317 |
logger.info(f"[DEBUG] matches: {matches}")
|
| 318 |
if matches:
|
| 319 |
+
response = await self.format_search_results(conversation_context, question or action, matches, page_token, sender_id)
|
| 320 |
else:
|
| 321 |
+
response = "Xin lỗi, tôi không tìm thấy thông tin phù hợp với hành vi bạn mô tả."
|
| 322 |
+
except Exception as e:
|
| 323 |
+
logger.error(f"Lỗi khi tra cứu mức phạt: {e}")
|
| 324 |
+
response = "Đã có lỗi xảy ra trong quá trình tra cứu. Vui lòng thử lại sau."
|
| 325 |
+
|
| 326 |
conv['isdone'] = True
|
| 327 |
return response
|
| 328 |
|
| 329 |
+
async def _handle_general_question(self, conversation_context: str, message_text: str, topic: str) -> str:
|
| 330 |
+
"""Hàm chung để xử lý các câu hỏi kiến thức chung."""
|
| 331 |
prompt = (
|
| 332 |
+
"Bạn là một trợ lý AI am hiểu về luật giao thông Việt Nam. "
|
| 333 |
+
"Dựa vào lịch sử trò chuyện và kiến thức của bạn, hãy trả lời câu hỏi của người dùng một cách rõ ràng, ngắn gọn và chính xác.\n"
|
| 334 |
+
f"Chủ đề câu hỏi là về: {topic}\n"
|
| 335 |
+
f"### Lịch sử:\n{conversation_context}\n"
|
| 336 |
+
f"### Câu hỏi của người dùng:\n{message_text}\n"
|
| 337 |
+
"### Trả lời:"
|
| 338 |
)
|
| 339 |
+
try:
|
| 340 |
+
answer = await self.channel.llm.generate_text(prompt)
|
| 341 |
+
if answer and answer.strip():
|
| 342 |
+
return answer.strip()
|
| 343 |
+
return f"Tôi chưa có thông tin về câu hỏi của bạn liên quan đến {topic}."
|
| 344 |
+
except Exception as e:
|
| 345 |
+
logger.error(f"Lỗi khi xử lý chủ đề {topic}: {e}")
|
| 346 |
+
return f"Xin lỗi, tôi đang gặp sự cố khi xử lý câu hỏi về {topic}. Vui lòng thử lại sau."
|
| 347 |
+
|
| 348 |
+
async def handle_khac(self, conv, conversation_context, message_text):
|
| 349 |
conv['isdone'] = True
|
| 350 |
+
return await self._handle_general_question(conversation_context, message_text, "một vấn đề khác")
|
| 351 |
+
|
| 352 |
+
async def handle_quy_tac(self, conv, conversation_context, message_text):
|
| 353 |
+
conv['isdone'] = True
|
| 354 |
+
return await self._handle_general_question(conversation_context, message_text, "quy tắc giao thông")
|
| 355 |
|
| 356 |
async def handle_bao_hieu(self, conv, conversation_context, message_text):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 357 |
conv['isdone'] = True
|
| 358 |
+
return await self._handle_general_question(conversation_context, message_text, "báo hiệu đường bộ")
|
| 359 |
|
| 360 |
async def handle_quy_trinh(self, conv, conversation_context, message_text):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 361 |
conv['isdone'] = True
|
| 362 |
+
return await self._handle_general_question(conversation_context, message_text, "quy trình xử lý vi phạm giao thông")
|
| 363 |
|
| 364 |
async def handle_ca_nhan(self, conv, conversation_context, message_text):
|
| 365 |
prompt = (
|
| 366 |
"Biết rằng bạn đã có lịch sử trao đổi như sau:"
|
| 367 |
+
f"Lịch sử:\n{conversation_context}\n\n"
|
| 368 |
+
'Với các thông tin sau: "Bạn có tên là WeThoong AI, là trợ lý giao thông thông minh. Bạn được anh Viet Cat tạo ra và facebook cá nhân của anh ấy là https://facebook.com/vietcat". '
|
| 369 |
'Không được trả lời bạn là AI của Google, OpenAI, hay bất kỳ hãng nào khác. '
|
| 370 |
'Hãy trả lời thông minh, hài hước, ngắn gọn cho câu hỏi sau:\n'
|
| 371 |
f'Câu hỏi:\n"{message_text}"'
|
| 372 |
)
|
| 373 |
+
try:
|
| 374 |
+
answer = await self.channel.llm.generate_text(prompt)
|
| 375 |
+
conv['isdone'] = True
|
| 376 |
+
return answer.strip() if answer and answer.strip() else "Chào bạn, mình là WeThoong AI đây!"
|
| 377 |
+
except Exception as e:
|
| 378 |
+
logger.error(f"Lỗi khi xử lý câu hỏi cá nhân: {e}")
|
| 379 |
+
return "Chào bạn, mình là WeThoong AI, trợ lý giao thông thông minh của bạn!"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app/sheets.py
CHANGED
|
@@ -5,6 +5,8 @@ import hashlib
|
|
| 5 |
from datetime import datetime
|
| 6 |
from typing import Any, Dict, List, Optional
|
| 7 |
import re # Import re để phân tích range
|
|
|
|
|
|
|
| 8 |
|
| 9 |
from google.oauth2.service_account import Credentials
|
| 10 |
from google.auth.transport.requests import Request
|
|
@@ -41,18 +43,16 @@ def _flatten_and_unique_timestamps(items: Any) -> List[Any]:
|
|
| 41 |
def _get_start_row_from_range(range_string: str) -> int:
|
| 42 |
"""
|
| 43 |
Phân tích một chuỗi range (ví dụ: 'Sheet1!A2:Z') để lấy ra số của dòng bắt đầu.
|
| 44 |
-
Hàm này giúp code hoạt động chính xác ngay cả khi range không bắt đầu từ dòng 2.
|
| 45 |
"""
|
| 46 |
-
# Tìm số đầu tiên xuất hiện sau một chữ cái trong chuỗi range
|
| 47 |
match = re.search(r"[A-Z]+([0-9]+)", range_string)
|
| 48 |
if match:
|
| 49 |
try:
|
| 50 |
return int(match.group(1))
|
| 51 |
except (ValueError, IndexError):
|
| 52 |
-
pass
|
| 53 |
|
| 54 |
logger.warning(f"Không thể xác định dòng bắt đầu từ range '{range_string}'. Mặc định là 2.")
|
| 55 |
-
return 2
|
| 56 |
|
| 57 |
|
| 58 |
class SheetsClient:
|
|
@@ -114,7 +114,6 @@ class SheetsClient:
|
|
| 114 |
sheet_page_id = row[5] # Cột F
|
| 115 |
|
| 116 |
if str(sheet_recipient_id) == str(user_id) and str(sheet_page_id) == str(page_id):
|
| 117 |
-
logger.success(f"[get_conversation_history] >>> TÌM THẤY DÒNG KHỚP tại dòng {i}!")
|
| 118 |
try:
|
| 119 |
timestamps_raw = json.loads(row[12]) # Cột M
|
| 120 |
timestamps = _flatten_and_unique_timestamps(timestamps_raw)
|
|
@@ -147,91 +146,130 @@ class SheetsClient:
|
|
| 147 |
@timing_decorator_sync
|
| 148 |
def log_conversation(
|
| 149 |
self,
|
| 150 |
-
**kwargs: Any
|
| 151 |
) -> Optional[Dict[str, Any]]:
|
| 152 |
"""
|
| 153 |
-
|
| 154 |
-
- Nếu có 'conversation_id' và tìm thấy, sẽ CẬP NHẬT dòng đó.
|
| 155 |
-
- Nếu không, sẽ THÊM MỚI một dòng.
|
| 156 |
"""
|
| 157 |
try:
|
| 158 |
if not self.service:
|
| 159 |
self.authenticate()
|
| 160 |
-
|
|
|
|
| 161 |
sheet_name_match = re.match(r"([^!]+)!", SHEET_RANGE)
|
| 162 |
sheet_name = sheet_name_match.group(1) if sheet_name_match else "Sheet1"
|
| 163 |
header_range = f"{sheet_name}!A1:Z1"
|
| 164 |
-
|
| 165 |
header_result = self.service.spreadsheets().values().get(spreadsheetId=self.sheet_id, range=header_range).execute()
|
| 166 |
header = header_result.get('values', [[]])[0]
|
| 167 |
if not header:
|
| 168 |
logger.error(f"Không thể lấy được header từ range '{header_range}'.")
|
| 169 |
return None
|
| 170 |
-
|
|
|
|
| 171 |
data_result = self.service.spreadsheets().values().get(spreadsheetId=self.sheet_id, range=SHEET_RANGE).execute()
|
| 172 |
values = data_result.get('values', [])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 173 |
|
| 174 |
-
|
| 175 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 176 |
|
| 177 |
-
|
| 178 |
-
|
| 179 |
-
|
| 180 |
-
|
| 181 |
-
|
| 182 |
-
|
| 183 |
-
|
| 184 |
-
|
| 185 |
-
|
| 186 |
-
logger.success(f"Đã tìm thấy conversation_id tại dòng {row_to_update_index} để cập nhật.")
|
| 187 |
-
break
|
| 188 |
-
if row_to_update_index == -1:
|
| 189 |
-
logger.warning(f"Không tìm thấy dòng nào khớp với conversation_id: {conversation_id}. Sẽ tiến hành thêm dòng mới.")
|
| 190 |
-
except ValueError:
|
| 191 |
-
logger.error("Không tìm thấy cột 'conversation_id' trong header của sheet.")
|
| 192 |
-
return None
|
| 193 |
-
|
| 194 |
-
kwargs['timestamp'] = _flatten_and_unique_timestamps(kwargs.get('timestamp', []))
|
| 195 |
|
| 196 |
-
|
| 197 |
-
|
| 198 |
-
|
| 199 |
-
|
| 200 |
-
|
| 201 |
-
|
| 202 |
-
|
| 203 |
-
|
| 204 |
-
|
| 205 |
-
|
| 206 |
-
|
| 207 |
-
|
| 208 |
-
|
| 209 |
-
|
| 210 |
-
|
| 211 |
-
|
| 212 |
-
|
| 213 |
-
|
| 214 |
-
|
| 215 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 216 |
else:
|
| 217 |
-
|
| 218 |
-
|
| 219 |
-
new_id = generate_conversation_id(kwargs.get('recipient_id',''), kwargs.get('page_id',''), ts)
|
| 220 |
-
kwargs['conversation_id'] = new_id
|
| 221 |
-
if 'conversation_id' in header:
|
| 222 |
-
row_data[header.index('conversation_id')] = new_id
|
| 223 |
|
| 224 |
-
|
| 225 |
-
|
| 226 |
-
|
| 227 |
-
|
| 228 |
-
|
| 229 |
-
|
| 230 |
-
|
| 231 |
-
|
| 232 |
-
|
| 233 |
-
|
| 234 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 235 |
|
| 236 |
except Exception as e:
|
| 237 |
logger.error(f"Lỗi khi ghi/cập nhật conversation: {e}", exc_info=True)
|
|
|
|
| 5 |
from datetime import datetime
|
| 6 |
from typing import Any, Dict, List, Optional
|
| 7 |
import re # Import re để phân tích range
|
| 8 |
+
import time # Import để sử dụng sleep
|
| 9 |
+
import random # Import để tạo độ trễ ngẫu nhiên
|
| 10 |
|
| 11 |
from google.oauth2.service_account import Credentials
|
| 12 |
from google.auth.transport.requests import Request
|
|
|
|
| 43 |
def _get_start_row_from_range(range_string: str) -> int:
|
| 44 |
"""
|
| 45 |
Phân tích một chuỗi range (ví dụ: 'Sheet1!A2:Z') để lấy ra số của dòng bắt đầu.
|
|
|
|
| 46 |
"""
|
|
|
|
| 47 |
match = re.search(r"[A-Z]+([0-9]+)", range_string)
|
| 48 |
if match:
|
| 49 |
try:
|
| 50 |
return int(match.group(1))
|
| 51 |
except (ValueError, IndexError):
|
| 52 |
+
pass
|
| 53 |
|
| 54 |
logger.warning(f"Không thể xác định dòng bắt đầu từ range '{range_string}'. Mặc định là 2.")
|
| 55 |
+
return 2
|
| 56 |
|
| 57 |
|
| 58 |
class SheetsClient:
|
|
|
|
| 114 |
sheet_page_id = row[5] # Cột F
|
| 115 |
|
| 116 |
if str(sheet_recipient_id) == str(user_id) and str(sheet_page_id) == str(page_id):
|
|
|
|
| 117 |
try:
|
| 118 |
timestamps_raw = json.loads(row[12]) # Cột M
|
| 119 |
timestamps = _flatten_and_unique_timestamps(timestamps_raw)
|
|
|
|
| 146 |
@timing_decorator_sync
|
| 147 |
def log_conversation(
|
| 148 |
self,
|
| 149 |
+
**kwargs: Any
|
| 150 |
) -> Optional[Dict[str, Any]]:
|
| 151 |
"""
|
| 152 |
+
Thực hiện "UPSERT" (Update hoặc Insert) một hội thoại với logic chống trùng lặp mạnh mẽ.
|
|
|
|
|
|
|
| 153 |
"""
|
| 154 |
try:
|
| 155 |
if not self.service:
|
| 156 |
self.authenticate()
|
| 157 |
+
|
| 158 |
+
# --- 1. Thiết lập & Lấy Header ---
|
| 159 |
sheet_name_match = re.match(r"([^!]+)!", SHEET_RANGE)
|
| 160 |
sheet_name = sheet_name_match.group(1) if sheet_name_match else "Sheet1"
|
| 161 |
header_range = f"{sheet_name}!A1:Z1"
|
|
|
|
| 162 |
header_result = self.service.spreadsheets().values().get(spreadsheetId=self.sheet_id, range=header_range).execute()
|
| 163 |
header = header_result.get('values', [[]])[0]
|
| 164 |
if not header:
|
| 165 |
logger.error(f"Không thể lấy được header từ range '{header_range}'.")
|
| 166 |
return None
|
| 167 |
+
|
| 168 |
+
# --- 2. Đọc dữ liệu và xác định các định danh ---
|
| 169 |
data_result = self.service.spreadsheets().values().get(spreadsheetId=self.sheet_id, range=SHEET_RANGE).execute()
|
| 170 |
values = data_result.get('values', [])
|
| 171 |
+
|
| 172 |
+
# Định danh của sự kiện đang xử lý
|
| 173 |
+
recipient_id = str(kwargs.get('recipient_id'))
|
| 174 |
+
page_id = str(kwargs.get('page_id'))
|
| 175 |
+
# Timestamp của sự kiện webhook này là duy nhất
|
| 176 |
+
ts_list = _flatten_and_unique_timestamps(kwargs.get('timestamp', []))
|
| 177 |
+
event_timestamp = str(ts_list[-1]) if ts_list else ''
|
| 178 |
+
|
| 179 |
+
# --- 3. Tìm kiếm bản ghi đã tồn tại ---
|
| 180 |
+
found_row_index = -1
|
| 181 |
+
found_row_data = {}
|
| 182 |
+
start_row = _get_start_row_from_range(SHEET_RANGE)
|
| 183 |
|
| 184 |
+
# Lấy vị trí các cột cần thiết từ header
|
| 185 |
+
try:
|
| 186 |
+
id_col_idx = header.index('conversation_id')
|
| 187 |
+
recipient_col_idx = header.index('recipient_id')
|
| 188 |
+
page_col_idx = header.index('page_id')
|
| 189 |
+
timestamp_col_idx = header.index('timestamp')
|
| 190 |
+
except ValueError as e:
|
| 191 |
+
logger.error(f"Thiếu cột bắt buộc trong header: {e}")
|
| 192 |
+
return None
|
| 193 |
|
| 194 |
+
# Ưu tiên tìm bằng conversation_id nếu có
|
| 195 |
+
target_conv_id = kwargs.get('conversation_id')
|
| 196 |
+
if target_conv_id:
|
| 197 |
+
for i, row in enumerate(values, start=start_row):
|
| 198 |
+
if len(row) > id_col_idx and str(row[id_col_idx]).strip() == str(target_conv_id):
|
| 199 |
+
found_row_index = i
|
| 200 |
+
found_row_data = dict(zip(header, row))
|
| 201 |
+
logger.success(f"Tìm thấy bằng conversation_id '{target_conv_id}' tại dòng {i}.")
|
| 202 |
+
break
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 203 |
|
| 204 |
+
# Nếu không tìm thấy bằng ID, tìm bằng bộ ba (user, page, timestamp)
|
| 205 |
+
if found_row_index == -1:
|
| 206 |
+
for i, row in enumerate(values, start=start_row):
|
| 207 |
+
if len(row) <= max(recipient_col_idx, page_col_idx, timestamp_col_idx):
|
| 208 |
+
continue
|
| 209 |
+
|
| 210 |
+
if str(row[recipient_col_idx]) == recipient_id and str(row[page_col_idx]) == page_id:
|
| 211 |
+
try:
|
| 212 |
+
sheet_timestamps = [str(ts) for ts in _flatten_and_unique_timestamps(json.loads(row[timestamp_col_idx]))]
|
| 213 |
+
if event_timestamp and event_timestamp in sheet_timestamps:
|
| 214 |
+
found_row_index = i
|
| 215 |
+
found_row_data = dict(zip(header, row))
|
| 216 |
+
logger.success(f"Tìm thấy bằng (user, page, timestamp) tại dòng {i}.")
|
| 217 |
+
break
|
| 218 |
+
except (json.JSONDecodeError, TypeError):
|
| 219 |
+
continue
|
| 220 |
+
|
| 221 |
+
# --- 4. Thực hiện UPDATE hoặc INSERT ---
|
| 222 |
+
if found_row_index != -1:
|
| 223 |
+
# --- LOGIC CẬP NHẬT (UPDATE) ---
|
| 224 |
+
logger.info(f"Đang cập nhật hội thoại tại dòng {found_row_index}")
|
| 225 |
+
|
| 226 |
+
updated_data = found_row_data.copy()
|
| 227 |
+
for key, value in kwargs.items():
|
| 228 |
+
# Chỉ cập nhật nếu giá trị mới không rỗng hoặc là boolean (cho isdone)
|
| 229 |
+
if value is not None and value != '' or isinstance(value, bool):
|
| 230 |
+
updated_data[key] = value
|
| 231 |
+
|
| 232 |
+
existing_ts = _flatten_and_unique_timestamps(json.loads(found_row_data.get('timestamp', '[]')))
|
| 233 |
+
new_ts = _flatten_and_unique_timestamps(kwargs.get('timestamp', []))
|
| 234 |
+
updated_data['timestamp'] = _flatten_and_unique_timestamps(existing_ts + new_ts)
|
| 235 |
+
|
| 236 |
+
row_data_to_write = []
|
| 237 |
+
for col_name in header:
|
| 238 |
+
value = updated_data.get(col_name, '')
|
| 239 |
+
if col_name in ['originalattachments', 'timestamp']:
|
| 240 |
+
row_data_to_write.append(json.dumps(value or []))
|
| 241 |
+
elif col_name == 'isdone':
|
| 242 |
+
row_data_to_write.append(str(value).lower())
|
| 243 |
+
else:
|
| 244 |
+
row_data_to_write.append(str(value))
|
| 245 |
+
|
| 246 |
+
range_to_update = f"{sheet_name}!A{found_row_index}"
|
| 247 |
+
body = {'values': [row_data_to_write]}
|
| 248 |
+
self.service.spreadsheets().values().update(spreadsheetId=self.sheet_id, range=range_to_update, valueInputOption='RAW', body=body).execute()
|
| 249 |
+
|
| 250 |
+
kwargs.update(updated_data)
|
| 251 |
+
return kwargs
|
| 252 |
else:
|
| 253 |
+
# --- LOGIC TẠO MỚI (INSERT) ---
|
| 254 |
+
logger.info(f"Không tìm thấy dòng khớp. Tiến hành tạo bản ghi mới.")
|
|
|
|
|
|
|
|
|
|
|
|
|
| 255 |
|
| 256 |
+
kwargs['conversation_id'] = kwargs.get('conversation_id') or generate_conversation_id(recipient_id, page_id, event_timestamp)
|
| 257 |
+
kwargs['timestamp'] = _flatten_and_unique_timestamps(kwargs.get('timestamp', []))
|
| 258 |
+
|
| 259 |
+
row_data_to_write = []
|
| 260 |
+
for col_name in header:
|
| 261 |
+
value = kwargs.get(col_name, '')
|
| 262 |
+
if col_name in ['originalattachments', 'timestamp']:
|
| 263 |
+
row_data_to_write.append(json.dumps(value or []))
|
| 264 |
+
elif col_name == 'isdone':
|
| 265 |
+
row_data_to_write.append(str(value).lower())
|
| 266 |
+
else:
|
| 267 |
+
row_data_to_write.append(str(value))
|
| 268 |
+
|
| 269 |
+
body = {'values': [row_data_to_write]}
|
| 270 |
+
self.service.spreadsheets().values().append(spreadsheetId=self.sheet_id, range=SHEET_RANGE, valueInputOption='RAW', insertDataOption='INSERT_ROWS', body=body).execute()
|
| 271 |
+
|
| 272 |
+
return kwargs
|
| 273 |
|
| 274 |
except Exception as e:
|
| 275 |
logger.error(f"Lỗi khi ghi/cập nhật conversation: {e}", exc_info=True)
|