FBChatBot / app /main.py
VietCat's picture
fix vector search
523c69e
raw
history blame
22 kB
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from loguru import logger
import json
from typing import Dict, Any, List
import asyncio
from concurrent.futures import ThreadPoolExecutor
import os
import traceback
import difflib
from .config import Settings, get_settings
from .facebook import FacebookClient
from .sheets import SheetsClient
from .supabase_db import SupabaseClient
from .embedding import EmbeddingClient
from .utils import setup_logging, extract_command, extract_keywords, timing_decorator_async, timing_decorator_sync, ensure_log_dir, validate_config
from .constants import VEHICLE_KEYWORDS, SHEET_RANGE, VEHICLE_KEYWORD_TO_COLUMN
from .health import router as health_router
from .llm import create_llm_client
app = FastAPI(title="WeBot Facebook Messenger API")
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize clients
settings = get_settings()
setup_logging(settings.log_level)
logger.info("[STARTUP] Đang lấy PORT từ biến môi trường hoặc config...")
port = int(os.environ.get("PORT", settings.port if hasattr(settings, 'port') else 7860))
logger.info(f"[STARTUP] PORT sử dụng: {port}")
logger.info("[STARTUP] Khởi tạo FacebookClient...")
facebook_client = FacebookClient(settings.facebook_app_secret)
logger.info("[STARTUP] Khởi tạo SheetsClient...")
sheets_client = SheetsClient(
settings.google_sheets_credentials_file,
settings.google_sheets_token_file,
settings.conversation_sheet_id
)
logger.info("[STARTUP] Khởi tạo SupabaseClient...")
supabase_client = SupabaseClient(settings.supabase_url, settings.supabase_key)
logger.info("[STARTUP] Khởi tạo EmbeddingClient...")
embedding_client = EmbeddingClient()
# Keywords to look for in messages
VEHICLE_KEYWORDS = ["xe máy", "ô tô", "xe đạp", "xe hơi"]
# Khởi tạo LLM client (ví dụ dùng HFS, bạn có thể đổi provider tuỳ ý)
# llm_client = create_llm_client(
# provider="hfs",
# base_url="https://vietcat-gemma34b.hf.space"
# )
# Khởi tạo LLM client Gemini
llm_client = create_llm_client(
provider="gemini",
api_key=settings.gemini_api_key,
base_url=settings.gemini_base_url,
model=settings.gemini_model
)
logger.info("[STARTUP] Mount health router...")
app.include_router(health_router)
logger.info("[STARTUP] Validate config...")
validate_config(settings)
executor = ThreadPoolExecutor(max_workers=4)
message_text = None
def flatten_timestamp(ts):
flat = []
for t in ts:
if isinstance(t, list):
flat.extend(flatten_timestamp(t))
else:
flat.append(t)
return flat
def normalize_vehicle_keyword(keyword: str) -> str:
"""
Chuẩn hoá giá trị phương tiện về đúng từ khoá gần nhất trong VEHICLE_KEYWORDS.
Nếu không khớp, trả về keyword gốc.
"""
if not keyword:
return ""
matches = difflib.get_close_matches(keyword.lower(), [k.lower() for k in VEHICLE_KEYWORDS], n=1, cutoff=0.6)
if matches:
# Trả về đúng case trong VEHICLE_KEYWORDS
for k in VEHICLE_KEYWORDS:
if k.lower() == matches[0]:
return k
return keyword
@app.get("/")
async def root():
"""Endpoint root để kiểm tra trạng thái app."""
logger.info("[HEALTH] Truy cập endpoint root /")
return {"status": "ok"}
@app.get("/webhook")
async def verify_webhook(request: Request):
"""
Xác thực webhook Facebook Messenger.
Input: request (Request) - request từ Facebook với các query params.
Output: Trả về challenge nếu verify thành công, lỗi nếu thất bại.
"""
params = dict(request.query_params)
mode = params.get("hub.mode")
token = str(params.get("hub.verify_token", ""))
challenge = str(params.get("hub.challenge", ""))
if not all([mode, token, challenge]):
raise HTTPException(status_code=400, detail="Missing parameters")
return await facebook_client.verify_webhook(
token,
challenge,
settings.facebook_verify_token
)
@app.post("/webhook")
@timing_decorator_async
async def webhook(request: Request):
"""
Nhận và xử lý message từ Facebook Messenger webhook.
Input: request (Request) - request chứa payload JSON từ Facebook.
Output: JSON status.
"""
logger.info(f"[DEBUG] Nhận message từ Facebook Messenger webhook...")
body_bytes = await request.body()
# Verify request is from Facebook
if not facebook_client.verify_signature(request, body_bytes):
raise HTTPException(status_code=403, detail="Invalid signature")
try:
body = json.loads(body_bytes)
# Kiểm tra an toàn echo
is_echo = (
isinstance(body, dict)
and "entry" in body
and isinstance(body["entry"], list)
and len(body["entry"]) > 0
and "messaging" in body["entry"][0]
and isinstance(body["entry"][0]["messaging"], list)
and len(body["entry"][0]["messaging"]) > 0
and body["entry"][0]["messaging"][0].get("message", {}).get("is_echo", False)
)
if is_echo:
logger.info(f"[DEBUG] Message is echo, skipping...")
return {"status": "ok"}
else:
message_data = facebook_client.parse_message(body)
logger.info(f"[DEBUG] message_data: {message_data}")
if not message_data:
return {"status": "ok"}
# Process the message
await process_message(message_data)
return {"status": "ok"}
except Exception as e:
logger.error(f"Error processing webhook: {e}\nTraceback: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail="Internal server error")
@timing_decorator_async
async def process_message(message_data: Dict[str, Any]):
# Kiểm tra message_data hợp lệ và đủ trường
if not message_data or not isinstance(message_data, dict):
logger.error(f"[ERROR] Invalid message_data: {message_data}")
return
required_fields = ["sender_id", "page_id", "text", "timestamp"]
for field in required_fields:
if field not in message_data:
logger.error(f"[ERROR] Missing field {field} in message_data: {message_data}")
return
sender_id = message_data["sender_id"]
page_id = message_data["page_id"]
message_text = message_data["text"]
timestamp = message_data["timestamp"]
attachments = message_data.get('attachments', [])
logger.bind(user_id=sender_id, page_id=page_id, message=message_text).info("Processing message")
# Nếu không có message_text và attachments, không xử lý
if not message_text and not attachments:
logger.info(f"[DEBUG] Không có message_text và attachments, không xử lý...")
return
# Get conversation history (run in thread pool)
loop = asyncio.get_event_loop()
history = await loop.run_in_executor(
executor, lambda: sheets_client.get_conversation_history(sender_id, page_id)
)
logger.info(f"[DEBUG] history: {history}")
log_kwargs = {
'conversation_id': None,
'recipient_id': sender_id,
'page_id': page_id,
'originaltext': message_text,
'originalcommand': '',
'originalcontent': '',
'originalattachments': attachments,
'originalvehicle': '',
'originalaction': '',
'originalpurpose': '',
'timestamp': [timestamp],
'isdone': False
}
logger.info(f"[DEBUG] Message cơ bản: {log_kwargs}")
conv = None
if history:
# 1. Chặn duplicate message (trùng sender_id, page_id, timestamp)
for row in history:
row_timestamps = flatten_timestamp(row.get('timestamp', []))
if isinstance(row_timestamps, list) and len(row_timestamps) == 1 and isinstance(row_timestamps[0], list):
row_timestamps = row_timestamps[0]
if (
str(timestamp) in [str(ts) for ts in row_timestamps]
and str(row.get('recipient_id')) == str(sender_id)
and str(row.get('page_id')) == str(page_id)
):
logger.info("[DUPLICATE] Message duplicate, skipping log.")
return
conv = {
'conversation_id': row.get('conversation_id'),
'recipient_id': row.get('recipient_id'),
'page_id': row.get('page_id'),
'originaltext': row.get('originaltext'),
'originalcommand': row.get('originalcommand'),
'originalcontent': row.get('originalcontent'),
'originalattachments': row.get('originalattachments'),
'originalvehicle': row.get('originalvehicle'),
'originalaction': row.get('originalaction'),
'originalpurpose': row.get('originalpurpose'),
'timestamp': row_timestamps,
'isdone': row.get('isdone')
}
else:
# 2. Ghi conversation mới NGAY LẬP TỨC với thông tin cơ bản
conv = await loop.run_in_executor(executor, lambda: sheets_client.log_conversation(**log_kwargs))
if not conv:
logger.error("Không thể tạo conversation mới!")
return
else:
logger.info(f"[DEBUG] Message history: {conv}")
for key, value in log_kwargs.items():
if value not in (None, "", []) and conv.get(key) in (None, "", []):
conv[key] = value
# Thêm timestamp mới nếu chưa có
conv['timestamp'] = flatten_timestamp(conv['timestamp'])
if timestamp not in conv['timestamp']:
conv['timestamp'].append(timestamp)
logger.info(f"[DEBUG] Message history sau update: {conv}")
await loop.run_in_executor(executor, lambda: sheets_client.log_conversation(**conv))
# Get page access token (sync)
page_token = supabase_client.get_page_token(page_id)
if page_token:
logger.info(f"[DEBUG] page_token: {page_token[:10]} ... {page_token[-10:]}")
else:
logger.info(f"[DEBUG] page_token: None")
# Nếu không có page_token, không xử lý
if not page_token:
logger.error(f"No access token found for page {page_id}")
return
await facebook_client.send_message(page_token, sender_id, "Ok, để mình check. Bạn chờ mình chút xíu nhé!")
# Extract command and keywords
command, remaining_text = extract_command(message_text)
# Sử dụng LLM để phân tích message_text và extract keywords, mục đích, hành vi vi phạm
llm_analysis = await llm_client.analyze(message_text)
logger.info(f"[LLM][RAW] Kết quả trả về từ analyze: {llm_analysis}")
muc_dich = None
hanh_vi_vi_pham = None
if isinstance(llm_analysis, dict):
keywords = [normalize_vehicle_keyword(llm_analysis.get('phuong_tien', ''))]
muc_dich = llm_analysis.get('muc_dich')
hanh_vi_vi_pham = llm_analysis.get('hanh_vi_vi_pham')
elif isinstance(llm_analysis, list) and len(llm_analysis) > 0:
keywords = [normalize_vehicle_keyword(llm_analysis[0].get('phuong_tien', ''))]
muc_dich = llm_analysis[0].get('muc_dich')
hanh_vi_vi_pham = llm_analysis[0].get('hanh_vi_vi_pham')
else:
keywords = extract_keywords(message_text, VEHICLE_KEYWORDS)
hanh_vi_vi_pham = message_text
for kw in keywords:
hanh_vi_vi_pham = hanh_vi_vi_pham.replace(kw, "")
hanh_vi_vi_pham = hanh_vi_vi_pham.strip()
logger.info(f"[DEBUG] Phương tiện: {keywords} - Hành vi: {hanh_vi_vi_pham} - Mục đích: {muc_dich}")
await facebook_client.send_message(page_token, sender_id, "Mình đang phân tích câu hỏi của bạn.....")
# 4. Update lại conversation với thông tin đầy đủ
update_kwargs = {
'conversation_id': conv['conversation_id'],
'recipient_id': sender_id,
'page_id': page_id,
'originaltext': message_text,
'originalcommand': command,
'originalcontent': remaining_text,
'originalattachments': attachments,
'originalvehicle': ','.join(keywords),
'originalaction': hanh_vi_vi_pham,
'originalpurpose': muc_dich,
'timestamp': flatten_timestamp(conv['timestamp']),
'isdone': False
}
for key, value in update_kwargs.items():
if value not in (None, "", []) and conv.get(key) in (None, "", []):
conv[key] = value
logger.info(f"[DEBUG] Message history update cuối cùng: {conv}")
# 5. Xử lý logic nghiệp vụ
response = await process_business_logic(conv, page_token)
logger.info(f"[DEBUG] Message history sau khi process: {conv}")
# 6. Gửi response và cập nhật final state
await facebook_client.send_message(page_token, sender_id, response)
await loop.run_in_executor(executor, lambda: sheets_client.log_conversation(**conv))
return
async def process_business_logic(log_kwargs: Dict[str, Any], page_token: str) -> str:
"""
Xử lý logic nghiệp vụ dựa trên thông tin conversation.
"""
command = log_kwargs.get('originalcommand', '')
vehicle = log_kwargs.get('originalvehicle', '')
action = log_kwargs.get('originalaction', '')
message = log_kwargs.get('originaltext', '')
# Tách vehicle thành list keywords
keywords = [kw.strip() for kw in vehicle.split(',') if kw.strip()]
if not command:
if keywords:
# Có thông tin phương tiện
if action:
logger.info(f"[DEBUG] tạo embedding: {action}")
embedding = await embedding_client.create_embedding(action)
logger.info(f"[DEBUG] embedding: {embedding[:5]} ... (total {len(embedding)})")
matches = supabase_client.match_documents(embedding, vehicle_keywords=keywords)
logger.info(f"[DEBUG] matches: {matches}")
if matches:
response = await format_search_results(message, matches)
else:
response = "Xin lỗi, tôi không tìm thấy thông tin phù hợp."
else:
logger.info(f"[DEBUG] Không có hành vi vi phạm: {message}")
response = "Xin lỗi, tôi không tìm thấy thông tin về hành vi vi phạm trong câu hỏi của bạn."
log_kwargs['isdone'] = True
else:
# Không có thông tin phương tiện
response = "Vui lòng cho biết loại phương tiện bạn cần tìm (xe máy, ô tô...)"
log_kwargs['isdone'] = False
else:
# Có command
if command == "xong":
# Tạo bài viết mới trên page (placeholder)
# TODO: Thay thế hàm này bằng logic thực tế
post_url = await create_facebook_post(page_token, log_kwargs['recipient_id'], [log_kwargs])
if post_url:
response = f"Bài viết đã được tạo thành công! Bạn có thể xem tại: {post_url}"
else:
response = "Đã xảy ra lỗi khi tạo bài viết. Vui lòng thử lại sau."
log_kwargs['isdone'] = True
else:
response = "Vui lòng cung cấp thêm thông tin và gõ lệnh \\xong khi hoàn tất."
log_kwargs['isdone'] = False
return response
async def format_search_results(question: str, matches: List[Dict[str, Any]]) -> str:
if not matches:
return "Không tìm thấy kết quả phù hợp."
# Tìm item có similarity cao nhất
top = None
top_result_text = ""
full_result_text = ""
def arr_to_str(arr, sep=", "):
if not arr:
return ""
if isinstance(arr, list):
return sep.join([str(x) for x in arr if x not in (None, "")])
return str(arr)
for i, match in enumerate(matches, 1):
if not top or (match.get('similarity', 0) > top.get('similarity', 0)):
top = match
full_result_text += f"\nĐoạn {i}:\n"
tieude = (match.get('tieude') or '').strip()
noidung = (match.get('noidung') or '').strip()
hanhvi = (tieude + "\n" + noidung).strip().replace('\n', ' ')
full_result_text += f"Thực hiện hành vi:\n{hanhvi}"
# Cá nhân bị phạt tiền
canhantu = arr_to_str(match.get('canhantu'))
canhanden = arr_to_str(match.get('canhanden'))
if canhantu or canhanden:
full_result_text += f"\nCá nhân sẽ bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ"
# Tổ chức bị phạt tiền
tochuctu = arr_to_str(match.get('tochuctu'))
tochucden = arr_to_str(match.get('tochucden'))
if tochuctu or tochucden:
full_result_text += f"\nTổ chức sẽ bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ"
# Hình phạt bổ sung
hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ")
if hpbsnoidung:
full_result_text += f"\nNgoài việc bị phạt tiền, người vi phạm còn bị: {hpbsnoidung}"
# Biện pháp khắc phục hậu quả
bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ")
if bpkpnoidung:
full_result_text += f"\nNgoài ra, người vi phạm còn bị buộc: {bpkpnoidung}"
if top and (top.get('tieude') or top.get('noidung')):
tieude = (top.get('tieude') or '').strip()
noidung = (top.get('noidung') or '').strip()
hanhvi = (tieude + "\n" + noidung).strip().replace('\n', ' ')
top_result_text += f"Thực hiện hành vi:\n{hanhvi}"
canhantu = arr_to_str(top.get('canhantu'))
canhanden = arr_to_str(top.get('canhanden'))
if canhantu or canhanden:
top_result_text += f"\nCá nhân sẽ bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ"
tochuctu = arr_to_str(top.get('tochuctu'))
tochucden = arr_to_str(top.get('tochucden'))
if tochuctu or tochucden:
top_result_text += f"\nTổ chức sẽ bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ"
hpbsnoidung = arr_to_str(top.get('hpbsnoidung'), sep="; ")
if hpbsnoidung:
top_result_text += f"\nNgoài việc bị phạt tiền, người vi phạm còn bị: {hpbsnoidung}"
bpkpnoidung = arr_to_str(top.get('bpkpnoidung'), sep="; ")
if bpkpnoidung:
top_result_text += f"\nNgoài ra, người vi phạm còn bị buộc: {bpkpnoidung}"
else:
result_text = "Không có kết quả phù hợp!"
# Prompt cho LLM
prompt = (
"Bạn là một trợ lý AI có kiến thức pháp luật, hãy trả lời câu hỏi dựa trên các đoạn luật sau. "
"Chỉ sử dụng thông tin có trong các đoạn, không tự đoán.\n"
f"\nCác đoạn luật liên quan:\n{full_result_text}"
"\n\nHãy trả lời ngắn gọn, dễ hiểu, trích dẫn rõ ràng thông tin từ các đoạn luật nếu cần."
f"\n\nCâu hỏi của người dùng: {question}\n"
)
# Gọi LLM để sinh câu trả lời, fallback nếu lỗi
try:
answer = await llm_client.generate_text(prompt)
if answer and answer.strip():
logger.error(f"LLM trả về câu trả lời: \n\tanswer: {answer}")
return answer.strip()
else:
logger.error(f"LLM không trả về câu trả lời phù hợp: \n\tanswer: {answer}")
except Exception as e:
logger.error(f"LLM không sẵn sàng: {e}\n{traceback.format_exc()}")
# Fallback: trả về tổng hợp các đoạn luật như cũ
fallback = "Tóm tắt các đoạn luật liên quan:\n\n"
for i, match in enumerate(matches, 1):
fallback += f"Đoạn {i}:\n"
tieude = (match.get('tieude') or '').strip()
noidung = (match.get('noidung') or '').strip()
if tieude or noidung:
fallback += f" - Hành vi: {(tieude + ' ' + noidung).strip()}\n"
canhantu = arr_to_str(match.get('canhantu'))
canhanden = arr_to_str(match.get('canhanden'))
if canhantu or canhanden:
fallback += f" - Cá nhân bị phạt tiền từ {canhantu} VNĐ đến {canhanden} VNĐ\n"
tochuctu = arr_to_str(match.get('tochuctu'))
tochucden = arr_to_str(match.get('tochucden'))
if tochuctu or tochucden:
fallback += f" - Tổ chức bị phạt tiền từ {tochuctu} VNĐ đến {tochucden} VNĐ\n"
hpbsnoidung = arr_to_str(match.get('hpbsnoidung'), sep="; ")
if hpbsnoidung:
fallback += f" - Hình phạt bổ sung: {hpbsnoidung}\n"
bpkpnoidung = arr_to_str(match.get('bpkpnoidung'), sep="; ")
if bpkpnoidung:
fallback += f" - Biện pháp khắc phục hậu quả: {bpkpnoidung}\n"
fallback += "\n"
return fallback.strip()
async def create_facebook_post(page_token: str, sender_id: str, history: List[Dict[str, Any]]) -> str:
"""
Placeholder: Tạo bài viết mới trên page Facebook. Trả về URL bài viết nếu thành công, None nếu thất bại.
"""
# TODO: Thay thế bằng logic thực tế
logger.info(f"[MOCK] Creating Facebook post for sender_id={sender_id} with history={history}")
return "https://facebook.com/mock_post_url"
if __name__ == "__main__":
import uvicorn
logger.info("[STARTUP] Bắt đầu chạy uvicorn server...")
uvicorn.run(
"app.main:app",
host="0.0.0.0",
port=port
)