| import hmac |
| import hashlib |
| import json |
| from typing import Any, Dict, Optional |
| import httpx |
| from fastapi import HTTPException, Request |
| from loguru import logger |
|
|
| from .utils import timing_decorator_async, timing_decorator_sync |
|
|
| class FacebookClient: |
| def __init__(self, app_secret: str): |
| """ |
| Khởi tạo FacebookClient với app_secret. |
| Input: app_secret (str) - Facebook App Secret. |
| Output: FacebookClient instance. |
| """ |
| self.app_secret = app_secret |
| self._client = httpx.AsyncClient() |
|
|
| @timing_decorator_async |
| async def verify_webhook(self, token: str, challenge: str, verify_token: str) -> int: |
| """ |
| Xác thực webhook Facebook bằng verify_token và trả về challenge. |
| Input: token (str), challenge (str), verify_token (str) |
| Output: int (challenge nếu thành công, lỗi nếu thất bại) |
| """ |
| if token != verify_token: |
| raise HTTPException(status_code=403, detail="Invalid verify token") |
| return int(challenge) |
|
|
| def verify_signature(self, request: Request, payload: bytes) -> bool: |
| """ |
| Kiểm tra chữ ký X-Hub-Signature-256 để xác thực request từ Facebook. |
| Input: request (Request), payload (bytes) |
| Output: bool (True nếu hợp lệ, False nếu không) |
| """ |
| signature = request.headers.get("X-Hub-Signature-256", "") |
| if not signature.startswith("sha256="): |
| return False |
|
|
| expected = hmac.new( |
| self.app_secret.encode(), |
| payload, |
| hashlib.sha256 |
| ).hexdigest() |
| |
| return hmac.compare_digest(signature[7:], expected) |
|
|
| @timing_decorator_async |
| async def send_message(self, page_access_token: str, recipient_id: str, message: str) -> Dict[str, Any]: |
| """ |
| Gửi message tới user qua Facebook Messenger API. |
| Input: page_access_token (str), recipient_id (str), message (str) |
| Output: dict (response từ Facebook API) |
| """ |
|
|
| logger.debug(f"Đang gửi tin nhắn đến Facebook Messenger....\n\t{message}") |
| url = f"https://graph.facebook.com/v18.0/me/messages?access_token={page_access_token}" |
| |
| payload = { |
| "recipient": {"id": recipient_id}, |
| "message": {"text": message} |
| } |
|
|
| try: |
| response = await self._client.post(url, json=payload) |
| response.raise_for_status() |
| return response.json() |
| except httpx.HTTPError as e: |
| logger.error(f"Error sending message to Facebook: {e}") |
| raise HTTPException(status_code=500, detail="Failed to send message to Facebook") |
|
|
| @timing_decorator_sync |
| def parse_message(self, body: Dict[str, Any]) -> Optional[Dict[str, Any]]: |
| """ |
| Parse message từ payload Facebook webhook. |
| Input: body (dict) - payload JSON từ Facebook. |
| Output: dict chứa sender_id, page_id, timestamp, text, attachments hoặc None nếu lỗi. |
| """ |
| try: |
| entry = body["entry"][0] |
| messaging = entry["messaging"][0] |
| |
| sender_id = messaging["sender"]["id"] |
| recipient_id = messaging["recipient"]["id"] |
| timestamp = messaging["timestamp"] |
| |
| message_data = { |
| "sender_id": sender_id, |
| "page_id": recipient_id, |
| "timestamp": timestamp, |
| "text": None, |
| "attachments": [] |
| } |
|
|
| if "message" in messaging: |
| message = messaging["message"] |
| if "text" in message: |
| message_data["text"] = message["text"] |
| if "attachments" in message: |
| message_data["attachments"] = message["attachments"] |
|
|
| return message_data |
| except (KeyError, IndexError) as e: |
| logger.error(f"Error parsing Facebook message: {e}") |
| return None |