|
|
import hmac |
|
|
import hashlib |
|
|
import json |
|
|
import asyncio |
|
|
import time |
|
|
from typing import Any, Dict, Optional |
|
|
import httpx |
|
|
from fastapi import HTTPException, Request |
|
|
from loguru import logger |
|
|
import facebook |
|
|
import requests |
|
|
from .config import Settings, get_settings |
|
|
|
|
|
from .utils import timing_decorator_async, timing_decorator_sync, _safe_truncate |
|
|
|
|
|
|
|
|
class FacebookClient: |
|
|
def __init__( |
|
|
self, |
|
|
app_secret: str, |
|
|
page_id: Optional[str] = None, |
|
|
page_token: Optional[str] = None, |
|
|
sender_id: Optional[str] = None, |
|
|
): |
|
|
""" |
|
|
Khởi tạo FacebookClient với app_secret. |
|
|
Input: app_secret (str) - Facebook App Secret. |
|
|
Output: FacebookClient instance. |
|
|
""" |
|
|
self.app_secret = app_secret |
|
|
self._client = httpx.AsyncClient() |
|
|
self.page_id = page_id |
|
|
self.page_token = page_token |
|
|
self.sender_id = sender_id |
|
|
|
|
|
def update_context( |
|
|
self, |
|
|
page_id: Optional[str] = None, |
|
|
page_token: Optional[str] = None, |
|
|
sender_id: Optional[str] = None, |
|
|
): |
|
|
""" |
|
|
Cập nhật các thông tin context (page_id, page_token, sender_id) của client. |
|
|
Input: page_id (str), page_token (str), sender_id (str) |
|
|
Output: None |
|
|
""" |
|
|
if page_id is not None: |
|
|
self.page_id = page_id |
|
|
if page_token is not None: |
|
|
self.page_token = page_token |
|
|
if sender_id is not None: |
|
|
self.sender_id = sender_id |
|
|
|
|
|
@timing_decorator_async |
|
|
async def verify_webhook( |
|
|
self, token: str, challenge: str, verify_token: str |
|
|
) -> int: |
|
|
""" |
|
|
Xác thực webhook Facebook bằng verify_token và trả về challenge. |
|
|
Input: token (str), challenge (str), verify_token (str) |
|
|
Output: int (challenge nếu thành công, lỗi nếu thất bại) |
|
|
""" |
|
|
if token != verify_token: |
|
|
raise HTTPException(status_code=403, detail="Invalid verify token") |
|
|
return int(challenge) |
|
|
|
|
|
def verify_signature(self, request: Request, payload: bytes) -> bool: |
|
|
""" |
|
|
Kiểm tra chữ ký X-Hub-Signature-256 để xác thực request từ Facebook. |
|
|
Input: request (Request), payload (bytes) |
|
|
Output: bool (True nếu hợp lệ, False nếu không) |
|
|
""" |
|
|
signature = request.headers.get("X-Hub-Signature-256", "") |
|
|
if not signature.startswith("sha256="): |
|
|
return False |
|
|
|
|
|
expected = hmac.new( |
|
|
self.app_secret.encode(), payload, hashlib.sha256 |
|
|
).hexdigest() |
|
|
|
|
|
return hmac.compare_digest(signature[7:], expected) |
|
|
|
|
|
def format_message(self, text: str) -> str: |
|
|
|
|
|
text = text.replace("\n* ", "\n- ") |
|
|
text = text.replace("\n * ", "\n + ") |
|
|
text = text.replace("\n* ", "\n- ") |
|
|
text = text.replace("\n * ", "\n + ") |
|
|
|
|
|
import re |
|
|
|
|
|
text = re.sub(r"\*\*([^\*]+)\*\*", r"*\1*", text) |
|
|
text = re.sub(r"__([^_]+)__", r"*\1*", text) |
|
|
|
|
|
text = re.sub(r"^#+\s+", "", text, flags=re.MULTILINE) |
|
|
|
|
|
text = re.sub(r"\n{3,}", "\n\n", text) |
|
|
|
|
|
return text |
|
|
|
|
|
def split_message(self, text: str, max_length: int = 2000) -> list: |
|
|
""" |
|
|
Chia message thành các đoạn <= max_length ký tự, ưu tiên chia theo dòng. |
|
|
""" |
|
|
lines = text.split("\n") |
|
|
messages = [] |
|
|
current = "" |
|
|
for line in lines: |
|
|
|
|
|
if len(current) + len(line) + 1 > max_length: |
|
|
messages.append(current.rstrip()) |
|
|
current = "" |
|
|
current += line + "\n" |
|
|
if current.strip(): |
|
|
messages.append(current.rstrip()) |
|
|
return messages |
|
|
|
|
|
def send_message_forwarder( |
|
|
self, access_token: str, recipient_id: str, message: str |
|
|
) -> dict: |
|
|
""" |
|
|
Gửi tin nhắn đến Facebook Messenger qua API được triển khai. |
|
|
|
|
|
Parameters: |
|
|
api_base_url (str): Base URL của API, ví dụ "https://your-project.vercel.app" |
|
|
recipient_id (str): PSID của người nhận |
|
|
access_token (str): Facebook Page Access Token |
|
|
message (str): Nội dung tin nhắn |
|
|
|
|
|
Returns: |
|
|
dict: Kết quả trả về từ API (JSON) |
|
|
""" |
|
|
|
|
|
api_base_url = get_settings().facebook_api_base_url |
|
|
url = f"{api_base_url.rstrip('/')}/api/send-message" |
|
|
payload = { |
|
|
"recipient_id": recipient_id, |
|
|
"access_token": access_token, |
|
|
"message": message, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
logger.debug( |
|
|
f"[FACEBOOK_FORWARDER] Forwarding message to {url}. Full payload: {json.dumps(payload, ensure_ascii=False)}" |
|
|
) |
|
|
|
|
|
try: |
|
|
response = requests.post(url, json=payload, timeout=10) |
|
|
response.raise_for_status() |
|
|
logger.info( |
|
|
f"[FACEBOOK_FORWARDER] Forwarder API returned status {response.status_code}." |
|
|
) |
|
|
return response.json() |
|
|
except requests.HTTPError as e: |
|
|
|
|
|
error_content = "No response body" |
|
|
try: |
|
|
|
|
|
error_content = e.response.text |
|
|
except Exception: |
|
|
pass |
|
|
logger.error( |
|
|
f"[FACEBOOK_FORWARDER] HTTP Error calling forwarder API: {e}. " |
|
|
f"Status: {e.response.status_code}. Payload sent: {json.dumps(payload, ensure_ascii=False)}. " |
|
|
f"Response: {error_content}" |
|
|
) |
|
|
return {"error": str(e), "details": error_content} |
|
|
except requests.RequestException as e: |
|
|
|
|
|
logger.error( |
|
|
f"[FACEBOOK_FORWARDER] Request Error calling forwarder API: {e}" |
|
|
) |
|
|
return {"error": str(e)} |
|
|
|
|
|
def _send_message_sync( |
|
|
self, page_access_token: str, recipient_id: str, message: str |
|
|
) -> dict: |
|
|
""" |
|
|
Gửi tin nhắn sử dụng facebook-sdk với request method trực tiếp. |
|
|
""" |
|
|
max_retries = 3 |
|
|
retry_delay = 1 |
|
|
|
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
graph = facebook.GraphAPI(access_token=page_access_token, version="3.1") |
|
|
|
|
|
|
|
|
result = graph.request( |
|
|
path="me/messages", |
|
|
post_args={ |
|
|
"recipient": {"id": recipient_id}, |
|
|
"message": {"text": message}, |
|
|
}, |
|
|
timeout=30, |
|
|
) |
|
|
return result |
|
|
except facebook.GraphAPIError as e: |
|
|
logger.error( |
|
|
f"Facebook GraphAPI Error (attempt {attempt + 1}/{max_retries}): {e}" |
|
|
) |
|
|
if attempt == max_retries - 1: |
|
|
raise HTTPException( |
|
|
status_code=500, |
|
|
detail=f"Failed to send message to Facebook: {e}", |
|
|
) |
|
|
time.sleep(retry_delay) |
|
|
retry_delay *= 2 |
|
|
except Exception as e: |
|
|
logger.error( |
|
|
f"Unexpected error sending message to Facebook (attempt {attempt + 1}/{max_retries}): {e}" |
|
|
) |
|
|
if attempt == max_retries - 1: |
|
|
raise HTTPException( |
|
|
status_code=500, detail="Failed to send message to Facebook" |
|
|
) |
|
|
time.sleep(retry_delay) |
|
|
retry_delay *= 2 |
|
|
|
|
|
def _get_page_info_sync(self, page_access_token: str, page_id: str) -> dict: |
|
|
""" |
|
|
Lấy thông tin page sử dụng Facebook SDK. |
|
|
""" |
|
|
max_retries = 3 |
|
|
retry_delay = 1 |
|
|
|
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
graph = facebook.GraphAPI(access_token=page_access_token, version="3.1") |
|
|
result = graph.get_object(page_id) |
|
|
return result |
|
|
except facebook.GraphAPIError as e: |
|
|
logger.error( |
|
|
f"Facebook GraphAPI Error getting page info (attempt {attempt + 1}/{max_retries}): {e}" |
|
|
) |
|
|
if attempt == max_retries - 1: |
|
|
raise HTTPException( |
|
|
status_code=500, detail=f"Failed to get page info: {e}" |
|
|
) |
|
|
time.sleep(retry_delay) |
|
|
retry_delay *= 2 |
|
|
except Exception as e: |
|
|
logger.error( |
|
|
f"Unexpected error getting page info (attempt {attempt + 1}/{max_retries}): {e}" |
|
|
) |
|
|
if attempt == max_retries - 1: |
|
|
raise HTTPException( |
|
|
status_code=500, detail="Failed to get page info" |
|
|
) |
|
|
time.sleep(retry_delay) |
|
|
retry_delay *= 2 |
|
|
|
|
|
@timing_decorator_async |
|
|
async def send_message( |
|
|
self, |
|
|
page_access_token: Optional[str] = None, |
|
|
recipient_id: Optional[str] = None, |
|
|
message: str = "", |
|
|
) -> dict: |
|
|
page_access_token = page_access_token or self.page_token |
|
|
recipient_id = recipient_id or self.sender_id |
|
|
|
|
|
if not message or not str(message).strip(): |
|
|
logger.warning( |
|
|
f"[FACEBOOK_SEND] Attempted to send an empty or whitespace-only message to recipient {recipient_id}. Aborting." |
|
|
) |
|
|
return {} |
|
|
|
|
|
if not page_access_token or not recipient_id: |
|
|
logger.error( |
|
|
f"[FACEBOOK_SEND] Missing page_access_token or recipient_id. Cannot send message." |
|
|
) |
|
|
raise ValueError( |
|
|
"FacebookClient: page_access_token and recipient_id must not be None when sending a message." |
|
|
) |
|
|
|
|
|
logger.info( |
|
|
f"[FACEBOOK_SEND] Preparing to send message to recipient {recipient_id}. Full message (truncated): '{_safe_truncate(str(message))}'" |
|
|
) |
|
|
|
|
|
|
|
|
response_to_send = self.format_message(str(message).replace("**", "*")) |
|
|
|
|
|
|
|
|
messages = self.split_message(response_to_send) |
|
|
results = [] |
|
|
|
|
|
for i, msg_part in enumerate(messages, 1): |
|
|
if len(msg_part) > 2000: |
|
|
msg_part = msg_part[:2000] |
|
|
|
|
|
logger.info( |
|
|
f"[FACEBOOK_SEND] Sending part {i}/{len(messages)} to recipient {recipient_id}." |
|
|
) |
|
|
try: |
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
result = await loop.run_in_executor( |
|
|
None, |
|
|
self.send_message_forwarder, |
|
|
page_access_token, |
|
|
recipient_id, |
|
|
msg_part, |
|
|
) |
|
|
results.append(result) |
|
|
except Exception as e: |
|
|
logger.error( |
|
|
f"[FACEBOOK_SEND] Failed to send part {i}/{len(messages)} to {recipient_id}. Error: {e}" |
|
|
) |
|
|
results.append({"error": str(e), "part": i}) |
|
|
|
|
|
return results[0] if results else {} |
|
|
|
|
|
@timing_decorator_async |
|
|
async def get_page_info( |
|
|
self, page_access_token: Optional[str] = None, page_id: Optional[str] = None |
|
|
) -> dict: |
|
|
""" |
|
|
Lấy thông tin page sử dụng Facebook SDK (async). |
|
|
""" |
|
|
page_access_token = page_access_token or self.page_token |
|
|
page_id = page_id or self.page_id |
|
|
if not page_access_token or not page_id: |
|
|
raise ValueError( |
|
|
"FacebookClient: page_access_token and page_id must not be None when getting page info." |
|
|
) |
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
result = await loop.run_in_executor( |
|
|
None, self._get_page_info_sync, page_access_token, page_id |
|
|
) |
|
|
return result |
|
|
|
|
|
@timing_decorator_sync |
|
|
def parse_message(self, body: Dict[str, Any]) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Parse message từ payload Facebook webhook. |
|
|
Input: body (dict) - payload JSON từ Facebook. |
|
|
Output: dict chứa sender_id, page_id, timestamp, text, attachments hoặc None nếu lỗi. |
|
|
""" |
|
|
try: |
|
|
entry = body["entry"][0] |
|
|
messaging = entry["messaging"][0] |
|
|
|
|
|
sender_id = messaging["sender"]["id"] |
|
|
recipient_id = messaging["recipient"]["id"] |
|
|
timestamp = messaging["timestamp"] |
|
|
|
|
|
message_data = { |
|
|
"sender_id": sender_id, |
|
|
"page_id": recipient_id, |
|
|
"timestamp": timestamp, |
|
|
"text": None, |
|
|
"attachments": [], |
|
|
} |
|
|
|
|
|
if "message" in messaging: |
|
|
message = messaging["message"] |
|
|
if "text" in message: |
|
|
message_data["text"] = message["text"] |
|
|
if "attachments" in message: |
|
|
message_data["attachments"] = message["attachments"] |
|
|
|
|
|
return message_data |
|
|
except (KeyError, IndexError) as e: |
|
|
logger.error(f"Error parsing Facebook message: {e}\n\n{body}") |
|
|
return None |
|
|
|