VietCat commited on
Commit
e68c212
·
1 Parent(s): 5b886d1

refactor chat channel structure

Browse files
Files changed (1) hide show
  1. app/main.py +8 -350
app/main.py CHANGED
@@ -21,6 +21,7 @@ from .llm import create_llm_client
21
  from .reranker import Reranker
22
  from .request_limit_manager import RequestLimitManager
23
  from .law_document_chunker import LawDocumentChunker
 
24
 
25
  app = FastAPI(title="WeBot Facebook Messenger API")
26
 
@@ -180,361 +181,18 @@ async def webhook(request: Request):
180
  if not message_data:
181
  return {"status": "ok"}
182
 
183
- # Process the message
184
- await process_message(message_data)
185
-
 
 
 
 
186
  return {"status": "ok"}
187
  except Exception as e:
188
  logger.error(f"Error processing webhook: {e}\nTraceback: {traceback.format_exc()}")
189
  raise HTTPException(status_code=500, detail="Internal server error")
190
 
191
- @timing_decorator_async
192
- async def process_message(message_data: Dict[str, Any]):
193
- # Kiểm tra message_data hợp lệ và đủ trường
194
- if not message_data or not isinstance(message_data, dict):
195
- logger.error(f"[ERROR] Invalid message_data: {message_data}")
196
- return
197
- required_fields = ["sender_id", "page_id", "text", "timestamp"]
198
- for field in required_fields:
199
- if field not in message_data:
200
- logger.error(f"[ERROR] Missing field {field} in message_data: {message_data}")
201
- return
202
- sender_id = message_data["sender_id"]
203
- page_id = message_data["page_id"]
204
- message_text = message_data["text"]
205
- timestamp = message_data["timestamp"]
206
- attachments = message_data.get('attachments', [])
207
- logger.bind(user_id=sender_id, page_id=page_id, message=message_text).info("Processing message")
208
-
209
- # Nếu không có message_text và attachments, không xử lý
210
- if not message_text and not attachments:
211
- logger.info(f"[DEBUG] Không có message_text và attachments, không xử lý...")
212
- return
213
-
214
- # Get conversation history (run in thread pool)
215
- loop = asyncio.get_event_loop()
216
- history = await loop.run_in_executor(
217
- executor, lambda: sheets_client.get_conversation_history(sender_id, page_id)
218
- )
219
- logger.info(f"[DEBUG] history: {history}")
220
-
221
- log_kwargs = {
222
- 'conversation_id': None,
223
- 'recipient_id': sender_id,
224
- 'page_id': page_id,
225
- 'originaltext': message_text,
226
- 'originalcommand': '',
227
- 'originalcontent': '',
228
- 'originalattachments': attachments,
229
- 'originalvehicle': '',
230
- 'originalaction': '',
231
- 'originalpurpose': '',
232
- 'timestamp': [timestamp],
233
- 'isdone': False
234
- }
235
-
236
- logger.info(f"[DEBUG] Message cơ bản: {log_kwargs}")
237
- conv = None
238
-
239
- if history:
240
- # 1. Chặn duplicate message (trùng sender_id, page_id, timestamp)
241
- for row in history:
242
- row_timestamps = flatten_timestamp(row.get('timestamp', []))
243
- if isinstance(row_timestamps, list) and len(row_timestamps) == 1 and isinstance(row_timestamps[0], list):
244
- row_timestamps = row_timestamps[0]
245
- if (
246
- str(timestamp) in [str(ts) for ts in row_timestamps]
247
- and str(row.get('recipient_id')) == str(sender_id)
248
- and str(row.get('page_id')) == str(page_id)
249
- ):
250
- logger.info("[DUPLICATE] Message duplicate, skipping log.")
251
- return
252
- conv = {
253
- 'conversation_id': row.get('conversation_id'),
254
- 'recipient_id': row.get('recipient_id'),
255
- 'page_id': row.get('page_id'),
256
- 'originaltext': row.get('originaltext'),
257
- 'originalcommand': row.get('originalcommand'),
258
- 'originalcontent': row.get('originalcontent'),
259
- 'originalattachments': row.get('originalattachments'),
260
- 'originalvehicle': row.get('originalvehicle'),
261
- 'originalaction': row.get('originalaction'),
262
- 'originalpurpose': row.get('originalpurpose'),
263
- 'timestamp': row_timestamps,
264
- 'isdone': row.get('isdone')
265
- }
266
- else:
267
- # 2. Ghi conversation mới NGAY LẬP TỨC với thông tin cơ bản
268
- conv = await loop.run_in_executor(executor, lambda: sheets_client.log_conversation(**log_kwargs))
269
-
270
- if not conv:
271
- logger.error("Không thể tạo conversation mới!")
272
- return
273
- else:
274
- logger.info(f"[DEBUG] Message history: {conv}")
275
- for key, value in log_kwargs.items():
276
- if value not in (None, "", []) and conv.get(key) in (None, "", []):
277
- conv[key] = value
278
- # Thêm timestamp mới nếu chưa có
279
- conv['timestamp'] = flatten_timestamp(conv['timestamp'])
280
- if timestamp not in conv['timestamp']:
281
- conv['timestamp'].append(timestamp)
282
-
283
- logger.info(f"[DEBUG] Message history sau update: {conv}")
284
-
285
- await loop.run_in_executor(executor, lambda: sheets_client.log_conversation(**conv))
286
-
287
- # Get page access token (sync)
288
- page_token = supabase_client.get_page_token(page_id)
289
- if page_token:
290
- logger.info(f"[DEBUG] page_token: {page_token[:10]} ... {page_token[-10:]}")
291
- else:
292
- logger.info(f"[DEBUG] page_token: None")
293
-
294
- # Nếu không có page_token, không xử lý
295
- if not page_token:
296
- logger.error(f"No access token found for page {page_id}")
297
- return
298
-
299
- await facebook_client.send_message(page_token, sender_id, "Ok, để mình check. Bạn chờ mình chút xíu nhé!")
300
-
301
- # Extract command and keywords
302
- command, remaining_text = extract_command(message_text)
303
- # Sử dụng LLM để phân tích message_text và extract keywords, mục đích, hành vi vi phạm
304
- llm_analysis = await llm_client.analyze(message_text)
305
- logger.info(f"[LLM][RAW] Kết quả trả về từ analyze: {llm_analysis}")
306
- muc_dich = None
307
- hanh_vi_vi_pham = None
308
- if isinstance(llm_analysis, dict):
309
- keywords = [normalize_vehicle_keyword(llm_analysis.get('phuong_tien', ''))]
310
- muc_dich = llm_analysis.get('muc_dich')
311
- hanh_vi_vi_pham = llm_analysis.get('hanh_vi_vi_pham')
312
- elif isinstance(llm_analysis, list) and len(llm_analysis) > 0:
313
- keywords = [normalize_vehicle_keyword(llm_analysis[0].get('phuong_tien', ''))]
314
- muc_dich = llm_analysis[0].get('muc_dich')
315
- hanh_vi_vi_pham = llm_analysis[0].get('hanh_vi_vi_pham')
316
- else:
317
- keywords = extract_keywords(message_text, VEHICLE_KEYWORDS)
318
- hanh_vi_vi_pham = message_text
319
- for kw in keywords:
320
- hanh_vi_vi_pham = hanh_vi_vi_pham.replace(kw, "")
321
- hanh_vi_vi_pham = hanh_vi_vi_pham.strip()
322
-
323
- logger.info(f"[DEBUG] Phương tiện: {keywords} - Hành vi: {hanh_vi_vi_pham} - Mục đích: {muc_dich}")
324
-
325
- await facebook_client.send_message(page_token, sender_id, f"... đang tìm kiếm hành vi {hanh_vi_vi_pham} .....")
326
-
327
- # 4. Update lại conversation với thông tin đầy đủ
328
- update_kwargs = {
329
- 'conversation_id': conv['conversation_id'],
330
- 'recipient_id': sender_id,
331
- 'page_id': page_id,
332
- 'originaltext': message_text,
333
- 'originalcommand': command,
334
- 'originalcontent': remaining_text,
335
- 'originalattachments': attachments,
336
- 'originalvehicle': ','.join(keywords),
337
- 'originalaction': hanh_vi_vi_pham,
338
- 'originalpurpose': muc_dich,
339
- 'timestamp': flatten_timestamp(conv['timestamp']),
340
- 'isdone': False
341
- }
342
-
343
- for key, value in update_kwargs.items():
344
- if value not in (None, "", []) and conv.get(key) in (None, "", []):
345
- conv[key] = value
346
- logger.info(f"[DEBUG] Message history update cuối cùng: {conv}")
347
- # 5. Xử lý logic nghiệp vụ
348
- response = await process_business_logic(conv, page_token)
349
- logger.info(f"[DEBUG] Message history sau khi process: {conv}")
350
-
351
- # 6. Gửi response và cập nhật final state
352
- await facebook_client.send_message(page_token, sender_id, response)
353
- await loop.run_in_executor(executor, lambda: sheets_client.log_conversation(**conv))
354
- return
355
-
356
- async def process_business_logic(log_kwargs: Dict[str, Any], page_token: str) -> str:
357
- """
358
- Xử lý logic nghiệp vụ dựa trên thông tin conversation.
359
- """
360
- command = log_kwargs.get('originalcommand', '')
361
- vehicle = log_kwargs.get('originalvehicle', '')
362
- action = log_kwargs.get('originalaction', '')
363
- message = log_kwargs.get('originaltext', '')
364
-
365
- # Tách vehicle thành list keywords
366
- keywords = [kw.strip() for kw in vehicle.split(',') if kw.strip()]
367
-
368
- if not command:
369
- if keywords:
370
- # Có thông tin phương tiện
371
- if action:
372
- logger.info(f"[DEBUG] tạo embedding: {action}")
373
- embedding = await embedding_client.create_embedding(action)
374
- logger.info(f"[DEBUG] embedding: {embedding[:5]} ... (total {len(embedding)})")
375
- matches = supabase_client.match_documents(
376
- embedding,
377
- vehicle_keywords=keywords,
378
- user_question=action
379
- )
380
- logger.info(f"[DEBUG] matches: {matches}")
381
- if matches:
382
- response = await format_search_results(message, matches, page_token, log_kwargs['recipient_id'])
383
- else:
384
- response = "Xin lỗi, tôi không tìm thấy thông tin phù hợp."
385
- else:
386
- logger.info(f"[DEBUG] Không có hành vi vi phạm: {message}")
387
- response = "Xin lỗi, tôi không tìm thấy thông tin về hành vi vi phạm trong câu hỏi của bạn."
388
-
389
- log_kwargs['isdone'] = True
390
- else:
391
- # Không có thông tin phương tiện
392
- response = "Vui lòng cho biết loại phương tiện bạn cần tìm (xe máy, ô tô...)"
393
- log_kwargs['isdone'] = False
394
- else:
395
- # Có command
396
- if command == "xong":
397
- # Tạo bài viết mới trên page (placeholder)
398
- # TODO: Thay thế hàm này bằng logic thực tế
399
- post_url = await create_facebook_post(page_token, log_kwargs['recipient_id'], [log_kwargs])
400
- if post_url:
401
- response = f"Bài viết đã được tạo thành công! Bạn có thể xem tại: {post_url}"
402
- else:
403
- response = "Đã xảy ra lỗi khi tạo bài viết. Vui lòng thử lại sau."
404
- log_kwargs['isdone'] = True
405
- else:
406
- response = "Vui lòng cung cấp thêm thông tin và gõ lệnh \\xong khi hoàn tất."
407
- log_kwargs['isdone'] = False
408
-
409
- return response
410
-
411
- async def format_search_results(question: str, matches: List[Dict[str, Any]], page_token: str, sender_id: str) -> str:
412
- if not matches:
413
- return "Không tìm thấy kết quả phù hợp."
414
-
415
- await facebook_client.send_message(page_token, sender_id, f"Tôi tìm thấy một số quy định rồi .....")
416
- # Rerank matches trước khi format cho LLM
417
- try:
418
- reranked = await reranker.rerank(question, matches, top_k=5)
419
- if reranked:
420
- matches = reranked
421
- except Exception as e:
422
- logger.error(f"[RERANK] Lỗi khi rerank: {e}")
423
- # Tìm item có similarity cao nhất
424
- top = None
425
- top_result_text = ""
426
- full_result_text = ""
427
-
428
- def arr_to_str(arr, sep=", "):
429
- if not arr:
430
- return ""
431
- if isinstance(arr, list):
432
- return sep.join([str(x) for x in arr if x not in (None, "")])
433
- return str(arr)
434
-
435
- for i, match in enumerate(matches, 1):
436
- if not top or (match.get('similarity', 0) > top.get('similarity', 0)):
437
- top = match
438
-
439
- full_result_text += f"\n{match.get('structure', '').strip()}:\n"
440
- tieude = (match.get('tieude') or '').strip()
441
- noidung = (match.get('noidung') or '').strip()
442
- hanhvi = (tieude + "\n" + noidung).strip().replace('\n', ' ')
443
- full_result_text += f"Thực hiện hành vi:\n{hanhvi}"
444
- # Cá nhân bị phạt tiền
445
- canhantu = arr_to_str(match.get('canhantu'))
446
- canhanden = arr_to_str(match.get('canhanden'))
447
- if canhantu or canhanden:
448
- full_result_text += f"\nCá nhân sẽ bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ"
449
- # Tổ chức bị phạt tiền
450
- tochuctu = arr_to_str(match.get('tochuctu'))
451
- tochucden = arr_to_str(match.get('tochucden'))
452
- if tochuctu or tochucden:
453
- full_result_text += f"\nTổ chức sẽ bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ"
454
- # Hình phạt bổ sung
455
- hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ")
456
- if hpbsnoidung:
457
- full_result_text += f"\nNgoài việc bị phạt tiền, người vi phạm còn bị: {hpbsnoidung}"
458
- # Biện pháp khắc phục hậu quả
459
- bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ")
460
- if bpkpnoidung:
461
- full_result_text += f"\nNgoài ra, người vi phạm còn bị buộc: {bpkpnoidung}"
462
-
463
- if top and (top.get('tieude') or top.get('noidung')):
464
- tieude = (top.get('tieude') or '').strip()
465
- noidung = (top.get('noidung') or '').strip()
466
- hanhvi = (tieude + "\n" + noidung).strip().replace('\n', ' ')
467
- top_result_text += f"Thực hiện hành vi:\n{hanhvi}"
468
- canhantu = arr_to_str(top.get('canhantu'))
469
- canhanden = arr_to_str(top.get('canhanden'))
470
- if canhantu or canhanden:
471
- top_result_text += f"\nCá nhân sẽ bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ"
472
- tochuctu = arr_to_str(top.get('tochuctu'))
473
- tochucden = arr_to_str(top.get('tochucden'))
474
- if tochuctu or tochucden:
475
- top_result_text += f"\nTổ chức sẽ bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ"
476
- hpbsnoidung = arr_to_str(top.get('hpbsnoidung'), sep="; ")
477
- if hpbsnoidung:
478
- top_result_text += f"\nNgoài việc bị phạt tiền, người vi phạm còn bị: {hpbsnoidung}"
479
- bpkpnoidung = arr_to_str(top.get('bpkpnoidung'), sep="; ")
480
- if bpkpnoidung:
481
- top_result_text += f"\nNgoài ra, người vi phạm còn bị buộc: {bpkpnoidung}"
482
- else:
483
- result_text = "Không có kết quả phù hợp!"
484
-
485
- # Prompt cho LLM
486
- prompt = (
487
- "Bạn là một trợ lý AI có kiến thức pháp luật, hãy trả lời câu hỏi dựa trên các đoạn luật sau. "
488
- "Chỉ sử dụng thông tin có trong các đoạn, không tự đoán.\n"
489
- f"\nCác đoạn luật liên quan:\n{full_result_text}"
490
- "\n\nHãy trả lời ngắn gọn, dễ hiểu, trích dẫn rõ ràng thông tin từ các đoạn luật nếu cần."
491
- f"\n\nCâu hỏi của người dùng: {question}\n"
492
- )
493
-
494
- await facebook_client.send_message(page_token, sender_id, f"Được rồi, để tôi tóm tắt lại nhé .....")
495
- # Gọi LLM để sinh câu trả lời, fallback nếu lỗi
496
- try:
497
- answer = await llm_client.generate_text(prompt)
498
- if answer and answer.strip():
499
- logger.error(f"LLM trả về câu trả lời: \n\tanswer: {answer}")
500
- return answer.strip()
501
- else:
502
- logger.error(f"LLM không trả về câu trả lời phù hợp: \n\tanswer: {answer}")
503
- except Exception as e:
504
- logger.error(f"LLM không sẵn sàng: {e}\n{traceback.format_exc()}")
505
- # Fallback: trả về tổng hợp các đoạn luật như cũ
506
- fallback = "Tóm tắt các đoạn luật liên quan:\n\n"
507
- for i, match in enumerate(matches, 1):
508
- fallback += f"Đoạn {i}:\n"
509
- tieude = (match.get('tieude') or '').strip()
510
- noidung = (match.get('noidung') or '').strip()
511
- if tieude or noidung:
512
- fallback += f" - Hành vi: {(tieude + ' ' + noidung).strip()}\n"
513
- canhantu = arr_to_str(match.get('canhantu'))
514
- canhanden = arr_to_str(match.get('canhanden'))
515
- if canhantu or canhanden:
516
- fallback += f" - Cá nhân bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ\n"
517
- tochuctu = arr_to_str(match.get('tochuctu'))
518
- tochucden = arr_to_str(match.get('tochucden'))
519
- if tochuctu or tochucden:
520
- fallback += f" - Tổ chức bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ\n"
521
- hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ")
522
- if hpbsnoidung:
523
- fallback += f" - Hình phạt bổ sung: {hpbsnoidung}\n"
524
- bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ")
525
- if bpkpnoidung:
526
- fallback += f" - Biện pháp khắc phục hậu quả: {bpkpnoidung}\n"
527
- fallback += "\n"
528
- return fallback.strip()
529
-
530
- async def create_facebook_post(page_token: str, sender_id: str, history: List[Dict[str, Any]]) -> str:
531
- """
532
- Placeholder: Tạo bài viết mới trên page Facebook. Trả về URL bài viết nếu thành công, None nếu thất bại.
533
- """
534
- # TODO: Thay thế bằng logic thực tế
535
- logger.info(f"[MOCK] Creating Facebook post for sender_id={sender_id} with history={history}")
536
- return "https://facebook.com/mock_post_url"
537
-
538
  # ==================== DOCUMENT CHUNK MANAGEMENT APIs ====================
539
 
540
  @app.delete("/api/document-chunks/clear")
 
21
  from .reranker import Reranker
22
  from .request_limit_manager import RequestLimitManager
23
  from .law_document_chunker import LawDocumentChunker
24
+ from app.channel_manager import channel_manager
25
 
26
  app = FastAPI(title="WeBot Facebook Messenger API")
27
 
 
181
  if not message_data:
182
  return {"status": "ok"}
183
 
184
+ # --- Refactor: Lấy page_id, page_token, channel, gọi message_processor ---
185
+ page_id = message_data.get("page_id")
186
+ # Không lấy page_token ở đây nữa
187
+ # Lấy hoặc tạo ChatChannel
188
+ channel = channel_manager.get_or_create_channel("facebook", page_id)
189
+ # Gọi message_processor xử lý message
190
+ await channel.message_processor.process_message(message_data)
191
  return {"status": "ok"}
192
  except Exception as e:
193
  logger.error(f"Error processing webhook: {e}\nTraceback: {traceback.format_exc()}")
194
  raise HTTPException(status_code=500, detail="Internal server error")
195
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
196
  # ==================== DOCUMENT CHUNK MANAGEMENT APIs ====================
197
 
198
  @app.delete("/api/document-chunks/clear")