| from fastapi import FastAPI, Request, HTTPException, Depends |
| from fastapi.middleware.cors import CORSMiddleware |
| from loguru import logger |
| import json |
| from typing import Dict, Any, List |
| import asyncio |
| from concurrent.futures import ThreadPoolExecutor |
| import os |
| import traceback |
| import difflib |
|
|
| from .config import Settings, get_settings |
| from .facebook import FacebookClient |
| from .sheets import SheetsClient |
| from .supabase_db import SupabaseClient |
| from .embedding import EmbeddingClient |
| from .utils import setup_logging, extract_command, extract_keywords, timing_decorator_async, timing_decorator_sync, ensure_log_dir, validate_config |
| from .constants import VEHICLE_KEYWORDS, SHEET_RANGE, VEHICLE_KEYWORD_TO_COLUMN |
| from .health import router as health_router |
| from .llm import create_llm_client |
|
|
| app = FastAPI(title="WeBot Facebook Messenger API") |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| settings = get_settings() |
| setup_logging(settings.log_level) |
|
|
| logger.info("[STARTUP] Đang lấy PORT từ biến môi trường hoặc config...") |
| port = int(os.environ.get("PORT", settings.port if hasattr(settings, 'port') else 7860)) |
| logger.info(f"[STARTUP] PORT sử dụng: {port}") |
|
|
| logger.info("[STARTUP] Khởi tạo FacebookClient...") |
| facebook_client = FacebookClient(settings.facebook_app_secret) |
| logger.info("[STARTUP] Khởi tạo SheetsClient...") |
| sheets_client = SheetsClient( |
| settings.google_sheets_credentials_file, |
| settings.google_sheets_token_file, |
| settings.conversation_sheet_id |
| ) |
| logger.info("[STARTUP] Khởi tạo SupabaseClient...") |
| supabase_client = SupabaseClient(settings.supabase_url, settings.supabase_key) |
| logger.info("[STARTUP] Khởi tạo EmbeddingClient...") |
| embedding_client = EmbeddingClient() |
|
|
| |
| VEHICLE_KEYWORDS = ["xe máy", "ô tô", "xe đạp", "xe hơi"] |
|
|
| |
| |
| |
| |
| |
| |
| llm_client = create_llm_client( |
| provider="gemini", |
| api_key=settings.gemini_api_key, |
| base_url=settings.gemini_base_url, |
| model=settings.gemini_model |
| ) |
|
|
| logger.info("[STARTUP] Mount health router...") |
| app.include_router(health_router) |
|
|
| logger.info("[STARTUP] Validate config...") |
| validate_config(settings) |
|
|
| executor = ThreadPoolExecutor(max_workers=4) |
|
|
| message_text = None |
|
|
| def flatten_timestamp(ts): |
| flat = [] |
| for t in ts: |
| if isinstance(t, list): |
| flat.extend(flatten_timestamp(t)) |
| else: |
| flat.append(t) |
| return flat |
|
|
| def normalize_vehicle_keyword(keyword: str) -> str: |
| """ |
| Chuẩn hoá giá trị phương tiện về đúng từ khoá gần nhất trong VEHICLE_KEYWORDS. |
| Nếu không khớp, trả về keyword gốc. |
| """ |
| if not keyword: |
| return "" |
| matches = difflib.get_close_matches(keyword.lower(), [k.lower() for k in VEHICLE_KEYWORDS], n=1, cutoff=0.6) |
| if matches: |
| |
| for k in VEHICLE_KEYWORDS: |
| if k.lower() == matches[0]: |
| return k |
| return keyword |
|
|
| @app.get("/") |
| async def root(): |
| """Endpoint root để kiểm tra trạng thái app.""" |
| logger.info("[HEALTH] Truy cập endpoint root /") |
| return {"status": "ok"} |
|
|
| @app.get("/webhook") |
| async def verify_webhook(request: Request): |
| """ |
| Xác thực webhook Facebook Messenger. |
| Input: request (Request) - request từ Facebook với các query params. |
| Output: Trả về challenge nếu verify thành công, lỗi nếu thất bại. |
| """ |
| params = dict(request.query_params) |
| |
| mode = params.get("hub.mode") |
| token = str(params.get("hub.verify_token", "")) |
| challenge = str(params.get("hub.challenge", "")) |
|
|
| if not all([mode, token, challenge]): |
| raise HTTPException(status_code=400, detail="Missing parameters") |
|
|
| return await facebook_client.verify_webhook( |
| token, |
| challenge, |
| settings.facebook_verify_token |
| ) |
|
|
| @app.post("/webhook") |
| @timing_decorator_async |
| async def webhook(request: Request): |
| """ |
| Nhận và xử lý message từ Facebook Messenger webhook. |
| Input: request (Request) - request chứa payload JSON từ Facebook. |
| Output: JSON status. |
| """ |
|
|
| logger.info(f"[DEBUG] Nhận message từ Facebook Messenger webhook...") |
| body_bytes = await request.body() |
| |
| |
| if not facebook_client.verify_signature(request, body_bytes): |
| raise HTTPException(status_code=403, detail="Invalid signature") |
|
|
| try: |
| body = json.loads(body_bytes) |
| |
| is_echo = ( |
| isinstance(body, dict) |
| and "entry" in body |
| and isinstance(body["entry"], list) |
| and len(body["entry"]) > 0 |
| and "messaging" in body["entry"][0] |
| and isinstance(body["entry"][0]["messaging"], list) |
| and len(body["entry"][0]["messaging"]) > 0 |
| and body["entry"][0]["messaging"][0].get("message", {}).get("is_echo", False) |
| ) |
| if is_echo: |
| logger.info(f"[DEBUG] Message is echo, skipping...") |
| return {"status": "ok"} |
| else: |
| message_data = facebook_client.parse_message(body) |
| logger.info(f"[DEBUG] message_data: {message_data}") |
| |
| if not message_data: |
| return {"status": "ok"} |
|
|
| |
| await process_message(message_data) |
| |
| return {"status": "ok"} |
| except Exception as e: |
| logger.error(f"Error processing webhook: {e}\nTraceback: {traceback.format_exc()}") |
| raise HTTPException(status_code=500, detail="Internal server error") |
|
|
| @timing_decorator_async |
| async def process_message(message_data: Dict[str, Any]): |
| |
| if not message_data or not isinstance(message_data, dict): |
| logger.error(f"[ERROR] Invalid message_data: {message_data}") |
| return |
| required_fields = ["sender_id", "page_id", "text", "timestamp"] |
| for field in required_fields: |
| if field not in message_data: |
| logger.error(f"[ERROR] Missing field {field} in message_data: {message_data}") |
| return |
| sender_id = message_data["sender_id"] |
| page_id = message_data["page_id"] |
| message_text = message_data["text"] |
| timestamp = message_data["timestamp"] |
| attachments = message_data.get('attachments', []) |
| logger.bind(user_id=sender_id, page_id=page_id, message=message_text).info("Processing message") |
|
|
| |
| if not message_text and not attachments: |
| logger.info(f"[DEBUG] Không có message_text và attachments, không xử lý...") |
| return |
|
|
| |
| loop = asyncio.get_event_loop() |
| history = await loop.run_in_executor( |
| executor, lambda: sheets_client.get_conversation_history(sender_id, page_id) |
| ) |
| logger.info(f"[DEBUG] history: {history}") |
|
|
| log_kwargs = { |
| 'conversation_id': None, |
| 'recipient_id': sender_id, |
| 'page_id': page_id, |
| 'originaltext': message_text, |
| 'originalcommand': '', |
| 'originalcontent': '', |
| 'originalattachments': attachments, |
| 'originalvehicle': '', |
| 'originalaction': '', |
| 'originalpurpose': '', |
| 'timestamp': [timestamp], |
| 'isdone': False |
| } |
|
|
| logger.info(f"[DEBUG] Message cơ bản: {log_kwargs}") |
| conv = None |
|
|
| if history: |
| |
| for row in history: |
| row_timestamps = flatten_timestamp(row.get('timestamp', [])) |
| if isinstance(row_timestamps, list) and len(row_timestamps) == 1 and isinstance(row_timestamps[0], list): |
| row_timestamps = row_timestamps[0] |
| if ( |
| str(timestamp) in [str(ts) for ts in row_timestamps] |
| and str(row.get('recipient_id')) == str(sender_id) |
| and str(row.get('page_id')) == str(page_id) |
| ): |
| logger.info("[DUPLICATE] Message duplicate, skipping log.") |
| return |
| conv = { |
| 'conversation_id': row.get('conversation_id'), |
| 'recipient_id': row.get('recipient_id'), |
| 'page_id': row.get('page_id'), |
| 'originaltext': row.get('originaltext'), |
| 'originalcommand': row.get('originalcommand'), |
| 'originalcontent': row.get('originalcontent'), |
| 'originalattachments': row.get('originalattachments'), |
| 'originalvehicle': row.get('originalvehicle'), |
| 'originalaction': row.get('originalaction'), |
| 'originalpurpose': row.get('originalpurpose'), |
| 'timestamp': row_timestamps, |
| 'isdone': row.get('isdone') |
| } |
| else: |
| |
| conv = await loop.run_in_executor(executor, lambda: sheets_client.log_conversation(**log_kwargs)) |
| |
| if not conv: |
| logger.error("Không thể tạo conversation mới!") |
| return |
| else: |
| logger.info(f"[DEBUG] Message history: {conv}") |
| for key, value in log_kwargs.items(): |
| if value not in (None, "", []) and conv.get(key) in (None, "", []): |
| conv[key] = value |
| |
| conv['timestamp'] = flatten_timestamp(conv['timestamp']) |
| if timestamp not in conv['timestamp']: |
| conv['timestamp'].append(timestamp) |
| |
| logger.info(f"[DEBUG] Message history sau update: {conv}") |
|
|
| await loop.run_in_executor(executor, lambda: sheets_client.log_conversation(**conv)) |
|
|
| |
| page_token = supabase_client.get_page_token(page_id) |
| if page_token: |
| logger.info(f"[DEBUG] page_token: {page_token[:10]} ... {page_token[-10:]}") |
| else: |
| logger.info(f"[DEBUG] page_token: None") |
|
|
| |
| if not page_token: |
| logger.error(f"No access token found for page {page_id}") |
| return |
|
|
| await facebook_client.send_message(page_token, sender_id, "Ok, để mình check. Bạn chờ mình chút xíu nhé!") |
|
|
| |
| command, remaining_text = extract_command(message_text) |
| |
| llm_analysis = await llm_client.analyze(message_text) |
| logger.info(f"[LLM][RAW] Kết quả trả về từ analyze: {llm_analysis}") |
| muc_dich = None |
| hanh_vi_vi_pham = None |
| if isinstance(llm_analysis, dict): |
| keywords = [normalize_vehicle_keyword(llm_analysis.get('phuong_tien', ''))] |
| muc_dich = llm_analysis.get('muc_dich') |
| hanh_vi_vi_pham = llm_analysis.get('hanh_vi_vi_pham') |
| elif isinstance(llm_analysis, list) and len(llm_analysis) > 0: |
| keywords = [normalize_vehicle_keyword(llm_analysis[0].get('phuong_tien', ''))] |
| muc_dich = llm_analysis[0].get('muc_dich') |
| hanh_vi_vi_pham = llm_analysis[0].get('hanh_vi_vi_pham') |
| else: |
| keywords = extract_keywords(message_text, VEHICLE_KEYWORDS) |
| hanh_vi_vi_pham = message_text |
| for kw in keywords: |
| hanh_vi_vi_pham = hanh_vi_vi_pham.replace(kw, "") |
| hanh_vi_vi_pham = hanh_vi_vi_pham.strip() |
|
|
| logger.info(f"[DEBUG] Phương tiện: {keywords} - Hành vi: {hanh_vi_vi_pham} - Mục đích: {muc_dich}") |
| |
| await facebook_client.send_message(page_token, sender_id, "Mình đang phân tích câu hỏi của bạn.....") |
| |
| |
| update_kwargs = { |
| 'conversation_id': conv['conversation_id'], |
| 'recipient_id': sender_id, |
| 'page_id': page_id, |
| 'originaltext': message_text, |
| 'originalcommand': command, |
| 'originalcontent': remaining_text, |
| 'originalattachments': attachments, |
| 'originalvehicle': ','.join(keywords), |
| 'originalaction': hanh_vi_vi_pham, |
| 'originalpurpose': muc_dich, |
| 'timestamp': flatten_timestamp(conv['timestamp']), |
| 'isdone': False |
| } |
|
|
| for key, value in update_kwargs.items(): |
| if value not in (None, "", []) and conv.get(key) in (None, "", []): |
| conv[key] = value |
| logger.info(f"[DEBUG] Message history update cuối cùng: {conv}") |
| |
| response = await process_business_logic(conv, page_token) |
| logger.info(f"[DEBUG] Message history sau khi process: {conv}") |
|
|
| |
| await facebook_client.send_message(page_token, sender_id, response) |
| await loop.run_in_executor(executor, lambda: sheets_client.log_conversation(**conv)) |
| return |
|
|
| async def process_business_logic(log_kwargs: Dict[str, Any], page_token: str) -> str: |
| """ |
| Xử lý logic nghiệp vụ dựa trên thông tin conversation. |
| """ |
| command = log_kwargs.get('originalcommand', '') |
| vehicle = log_kwargs.get('originalvehicle', '') |
| action = log_kwargs.get('originalaction', '') |
| message = log_kwargs.get('originaltext', '') |
| |
| |
| keywords = [kw.strip() for kw in vehicle.split(',') if kw.strip()] |
| |
| if not command: |
| if keywords: |
| |
| if action: |
| logger.info(f"[DEBUG] tạo embedding: {action}") |
| embedding = await embedding_client.create_embedding(action) |
| logger.info(f"[DEBUG] embedding: {embedding[:5]} ... (total {len(embedding)})") |
| matches = supabase_client.match_documents(embedding, vehicle_keywords=keywords) |
| logger.info(f"[DEBUG] matches: {matches}") |
| if matches: |
| response = await format_search_results(message, matches) |
| else: |
| response = "Xin lỗi, tôi không tìm thấy thông tin phù hợp." |
| else: |
| logger.info(f"[DEBUG] Không có hành vi vi phạm: {message}") |
| response = "Xin lỗi, tôi không tìm thấy thông tin về hành vi vi phạm trong câu hỏi của bạn." |
| |
| log_kwargs['isdone'] = True |
| else: |
| |
| response = "Vui lòng cho biết loại phương tiện bạn cần tìm (xe máy, ô tô...)" |
| log_kwargs['isdone'] = False |
| else: |
| |
| if command == "xong": |
| |
| |
| post_url = await create_facebook_post(page_token, log_kwargs['recipient_id'], [log_kwargs]) |
| if post_url: |
| response = f"Bài viết đã được tạo thành công! Bạn có thể xem tại: {post_url}" |
| else: |
| response = "Đã xảy ra lỗi khi tạo bài viết. Vui lòng thử lại sau." |
| log_kwargs['isdone'] = True |
| else: |
| response = "Vui lòng cung cấp thêm thông tin và gõ lệnh \\xong khi hoàn tất." |
| log_kwargs['isdone'] = False |
| |
| return response |
|
|
| async def format_search_results(question: str, matches: List[Dict[str, Any]]) -> str: |
| if not matches: |
| return "Không tìm thấy kết quả phù hợp." |
| |
| top = None |
| top_result_text = "" |
| full_result_text = "" |
|
|
| def arr_to_str(arr, sep=", "): |
| if not arr: |
| return "" |
| if isinstance(arr, list): |
| return sep.join([str(x) for x in arr if x not in (None, "")]) |
| return str(arr) |
|
|
| for i, match in enumerate(matches, 1): |
| if not top or (match.get('similarity', 0) > top.get('similarity', 0)): |
| top = match |
|
|
| full_result_text += f"\nĐoạn {i}:\n" |
| tieude = (match.get('tieude') or '').strip() |
| noidung = (match.get('noidung') or '').strip() |
| hanhvi = (tieude + "\n" + noidung).strip().replace('\n', ' ') |
| full_result_text += f"Thực hiện hành vi:\n{hanhvi}" |
| |
| canhantu = arr_to_str(match.get('canhantu')) |
| canhanden = arr_to_str(match.get('canhanden')) |
| if canhantu or canhanden: |
| full_result_text += f"\nCá nhân sẽ bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ" |
| |
| tochuctu = arr_to_str(match.get('tochuctu')) |
| tochucden = arr_to_str(match.get('tochucden')) |
| if tochuctu or tochucden: |
| full_result_text += f"\nTổ chức sẽ bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ" |
| |
| hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ") |
| if hpbsnoidung: |
| full_result_text += f"\nNgoài việc bị phạt tiền, người vi phạm còn bị: {hpbsnoidung}" |
| |
| bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ") |
| if bpkpnoidung: |
| full_result_text += f"\nNgoài ra, người vi phạm còn bị buộc: {bpkpnoidung}" |
|
|
| if top and (top.get('tieude') or top.get('noidung')): |
| tieude = (top.get('tieude') or '').strip() |
| noidung = (top.get('noidung') or '').strip() |
| hanhvi = (tieude + "\n" + noidung).strip().replace('\n', ' ') |
| top_result_text += f"Thực hiện hành vi:\n{hanhvi}" |
| canhantu = arr_to_str(top.get('canhantu')) |
| canhanden = arr_to_str(top.get('canhanden')) |
| if canhantu or canhanden: |
| top_result_text += f"\nCá nhân sẽ bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ" |
| tochuctu = arr_to_str(top.get('tochuctu')) |
| tochucden = arr_to_str(top.get('tochucden')) |
| if tochuctu or tochucden: |
| top_result_text += f"\nTổ chức sẽ bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ" |
| hpbsnoidung = arr_to_str(top.get('hpbsnoidung'), sep="; ") |
| if hpbsnoidung: |
| top_result_text += f"\nNgoài việc bị phạt tiền, người vi phạm còn bị: {hpbsnoidung}" |
| bpkpnoidung = arr_to_str(top.get('bpkpnoidung'), sep="; ") |
| if bpkpnoidung: |
| top_result_text += f"\nNgoài ra, người vi phạm còn bị buộc: {bpkpnoidung}" |
| else: |
| result_text = "Không có kết quả phù hợp!" |
|
|
| |
| prompt = ( |
| "Bạn là một trợ lý AI có kiến thức pháp luật, hãy trả lời câu hỏi dựa trên các đoạn luật sau. " |
| "Chỉ sử dụng thông tin có trong các đoạn, không tự đoán.\n" |
| f"\nCác đoạn luật liên quan:\n{full_result_text}" |
| "\n\nHãy trả lời ngắn gọn, dễ hiểu, trích dẫn rõ ràng thông tin từ các đoạn luật nếu cần." |
| f"\n\nCâu hỏi của người dùng: {question}\n" |
| ) |
|
|
| |
| try: |
| answer = await llm_client.generate_text(prompt) |
| if answer and answer.strip(): |
| logger.error(f"LLM trả về câu trả lời: \n\tanswer: {answer}") |
| return answer.strip() |
| else: |
| logger.error(f"LLM không trả về câu trả lời phù hợp: \n\tanswer: {answer}") |
| except Exception as e: |
| logger.error(f"LLM không sẵn sàng: {e}\n{traceback.format_exc()}") |
| |
| fallback = "Tóm tắt các đoạn luật liên quan:\n\n" |
| for i, match in enumerate(matches, 1): |
| fallback += f"Đoạn {i}:\n" |
| tieude = (match.get('tieude') or '').strip() |
| noidung = (match.get('noidung') or '').strip() |
| if tieude or noidung: |
| fallback += f" - Hành vi: {(tieude + ' ' + noidung).strip()}\n" |
| canhantu = arr_to_str(match.get('canhantu')) |
| canhanden = arr_to_str(match.get('canhanden')) |
| if canhantu or canhanden: |
| fallback += f" - Cá nhân bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ\n" |
| tochuctu = arr_to_str(match.get('tochuctu')) |
| tochucden = arr_to_str(match.get('tochucden')) |
| if tochuctu or tochucden: |
| fallback += f" - Tổ chức bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ\n" |
| hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ") |
| if hpbsnoidung: |
| fallback += f" - Hình phạt bổ sung: {hpbsnoidung}\n" |
| bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ") |
| if bpkpnoidung: |
| fallback += f" - Biện pháp khắc phục hậu quả: {bpkpnoidung}\n" |
| fallback += "\n" |
| return fallback.strip() |
|
|
| async def create_facebook_post(page_token: str, sender_id: str, history: List[Dict[str, Any]]) -> str: |
| """ |
| Placeholder: Tạo bài viết mới trên page Facebook. Trả về URL bài viết nếu thành công, None nếu thất bại. |
| """ |
| |
| logger.info(f"[MOCK] Creating Facebook post for sender_id={sender_id} with history={history}") |
| return "https://facebook.com/mock_post_url" |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| logger.info("[STARTUP] Bắt đầu chạy uvicorn server...") |
| uvicorn.run( |
| "app.main:app", |
| host="0.0.0.0", |
| port=port |
| ) |