| import requests |
| import os |
| import logging |
| from typing import Optional, Dict, Any |
| import json |
| import socket |
| import threading |
| import certifi |
|
|
| logger = logging.getLogger(__name__) |
|
|
| WHATSAPP_API_VERSION = os.getenv("WHATSAPP_API_VERSION", "v22.0") |
| WHATSAPP_TOKEN = os.environ["whatsapp_token"] |
| PHONE_NUMBER_ID = os.environ["phone_number_id"] |
| BASE_URL = f"https://graph.facebook.com/{WHATSAPP_API_VERSION}/{PHONE_NUMBER_ID}" |
| HEADERS = { |
| "Authorization": f"Bearer {WHATSAPP_TOKEN}", |
| "Content-Type": "application/json", |
| } |
|
|
| |
| |
| |
| _PROXY_URL = os.getenv("WHATSAPP_PROXY_URL", "").rstrip("/") |
|
|
| |
| |
| |
| |
| _dns_overrides: dict = {} |
| _dns_lock = threading.Lock() |
| _orig_getaddrinfo = socket.getaddrinfo |
|
|
|
|
| def _patched_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0): |
| with _dns_lock: |
| override_ip = _dns_overrides.get(host) |
| if override_ip: |
| logger.debug(f"DNS override: {host} -> {override_ip}") |
| return _orig_getaddrinfo(override_ip, port, family, type, proto, flags) |
| return _orig_getaddrinfo(host, port, family, type, proto, flags) |
|
|
|
|
| socket.getaddrinfo = _patched_getaddrinfo |
|
|
|
|
| def set_db(db_client) -> None: |
| """No-op kept for compatibility β outbox pattern replaced by CF relay.""" |
| pass |
|
|
|
|
| def configure_session(config) -> None: |
| """Register socket-level DNS overrides (used only as fallback).""" |
| if not isinstance(config, dict): |
| return |
| with _dns_lock: |
| _dns_overrides.update(config) |
| logger.info(f"configure_session: DNS overrides registered: {config}") |
|
|
|
|
| |
| |
| |
| _session = requests.Session() |
| _session.headers.update(HEADERS) |
| _session.verify = certifi.where() |
| _session.headers.update({"Connection": "close"}) |
|
|
|
|
| |
| |
| |
|
|
| def send_message(recipient_id: str, message_data: Dict[str, Any]) -> bool: |
| """ |
| Send a WhatsApp message. |
| If WHATSAPP_PROXY_URL is set, calls the Cloudflare Worker /send endpoint. |
| The worker then calls graph.facebook.com β bypassing HF's outbound block. |
| Falls back to direct API call if no proxy is configured. |
| """ |
| if _PROXY_URL: |
| return _send_via_proxy(recipient_id, message_data) |
| return _send_direct(recipient_id, message_data) |
|
|
|
|
| def _send_via_proxy(recipient_id: str, message_data: Dict[str, Any]) -> bool: |
| """POST to Cloudflare Worker /send β worker relays to graph.facebook.com.""" |
| url = f"{_PROXY_URL}/send" |
| try: |
| |
| |
| resp = _session.post( |
| url, |
| json={"recipient_id": recipient_id, "message_data": message_data}, |
| timeout=30, |
| headers={"Content-Type": "application/json"}, |
| ) |
| resp.raise_for_status() |
| logger.info(f"Message relayed via CF worker to {recipient_id}: {resp.json()}") |
| return True |
| except requests.exceptions.RequestException as e: |
| logger.error(f"CF relay failed for {recipient_id}: {e} β trying direct send") |
| if hasattr(e, "response") and e.response is not None: |
| logger.error(f"Status: {e.response.status_code} Body: {e.response.text}") |
| return _send_direct(recipient_id, message_data) |
| except Exception as e: |
| logger.error(f"Unexpected error in CF relay for {recipient_id}: {e}") |
| return _send_direct(recipient_id, message_data) |
|
|
|
|
| def _send_direct(recipient_id: str, message_data: Dict[str, Any]) -> bool: |
| """Send directly via WhatsApp Cloud API (fallback / non-HF environments).""" |
| url = f"{BASE_URL}/messages" |
| payload = { |
| "messaging_product": "whatsapp", |
| "recipient_type": "individual", |
| "to": recipient_id, |
| **message_data, |
| } |
| try: |
| resp = _session.post(url, json=payload) |
| resp.raise_for_status() |
| logger.info(f"Message sent directly to {recipient_id}: {resp.json()}") |
| return True |
| except requests.exceptions.RequestException as e: |
| logger.error(f"Error sending message to {recipient_id}: {e}") |
| if hasattr(e, "response") and e.response is not None: |
| logger.error(f"Status: {e.response.status_code} Body: {e.response.text}") |
| return False |
| except Exception as e: |
| logger.error(f"Unexpected error sending message: {e}") |
| return False |
|
|
|
|
| |
| |
| |
|
|
| def send_text_message(recipient_id: str, text: str) -> bool: |
| return send_message(recipient_id, { |
| "type": "text", |
| "text": {"preview_url": False, "body": text}, |
| }) |
|
|
|
|
| def send_image_message(recipient_id: str, |
| image_url: Optional[str] = None, |
| image_id: Optional[str] = None) -> bool: |
| if not (image_url or image_id): |
| logger.error("send_image_message: need image_url or image_id") |
| return False |
| img = {"id": image_id} if image_id else {"link": image_url} |
| return send_message(recipient_id, {"type": "image", "image": img}) |
|
|
|
|
| def send_reply_buttons(recipient_id: str, body_text: str, button_data: list) -> bool: |
| valid_buttons = [] |
| for btn in button_data[:3]: |
| reply = btn.get("reply", {}) |
| btn_id = str(reply.get("id", "")).strip()[:256] |
| btn_title = str(reply.get("title", "")).strip()[:20] |
| if btn_id and btn_title: |
| valid_buttons.append({"type": "reply", "reply": {"id": btn_id, "title": btn_title}}) |
| else: |
| logger.warning(f"Skipping invalid button: {btn}") |
| if not valid_buttons: |
| logger.error("No valid buttons for interactive message.") |
| return False |
| return send_message(recipient_id, { |
| "type": "interactive", |
| "interactive": { |
| "type": "button", |
| "body": {"text": str(body_text)[:1024]}, |
| "action": {"buttons": valid_buttons}, |
| }, |
| }) |
|
|
|
|
| def get_media_url(media_id: str) -> Optional[str]: |
| url = f"https://graph.facebook.com/{WHATSAPP_API_VERSION}/{media_id}" |
| try: |
| resp = _session.get(url) |
| resp.raise_for_status() |
| data = resp.json() |
| logger.info(f"Media URL for {media_id}: {data.get('url')}") |
| return data.get("url") |
| except requests.exceptions.RequestException as e: |
| logger.error(f"Error getting media URL {media_id}: {e}") |
| if hasattr(e, "response") and e.response is not None: |
| logger.error(f"Status: {e.response.status_code} Body: {e.response.text}") |
| return None |
| except Exception as e: |
| logger.error(f"Unexpected error getting media URL: {e}") |
| return None |
|
|
|
|
| def download_media(media_url: str, save_path: str) -> Optional[str]: |
| try: |
| resp = _session.get(media_url, stream=True) |
| resp.raise_for_status() |
| os.makedirs(os.path.dirname(save_path), exist_ok=True) |
| with open(save_path, "wb") as f: |
| for chunk in resp.iter_content(chunk_size=8192): |
| f.write(chunk) |
| logger.info(f"Media saved to {save_path}") |
| return save_path |
| except requests.exceptions.RequestException as e: |
| logger.error(f"Error downloading from {media_url}: {e}") |
| if hasattr(e, "response") and e.response is not None: |
| logger.error(f"Status: {e.response.status_code} Body: {e.response.text}") |
| return None |
| except Exception as e: |
| logger.error(f"Unexpected error downloading media: {e}") |
| return None |
|
|
|
|
| def upload_media(file_path: str, mime_type: str = "audio/mpeg") -> Optional[str]: |
| url = f"{BASE_URL}/media" |
| try: |
| if not os.path.isfile(file_path): |
| logger.error(f"upload_media: file not found: {file_path}") |
| return None |
| with open(file_path, "rb") as fh: |
| files = {"file": (os.path.basename(file_path), fh, mime_type)} |
| data = {"messaging_product": "whatsapp", "type": mime_type} |
| resp = _session.post(url, files=files, data=data) |
| resp.raise_for_status() |
| media_id = resp.json().get("id") |
| logger.info(f"upload_media: uploaded {file_path} -> media_id={media_id}") |
| return media_id |
| except Exception as e: |
| logger.error(f"upload_media: failed: {e}", exc_info=True) |
| return None |
|
|
|
|
| def send_audio_message(recipient_id: str, |
| audio_path: Optional[str] = None, |
| audio_url: Optional[str] = None, |
| audio_id: Optional[str] = None) -> bool: |
| final_audio_id = audio_id |
| if not final_audio_id and audio_path: |
| final_audio_id = upload_media(audio_path, mime_type="audio/mpeg") |
| if final_audio_id: |
| return send_message(recipient_id, {"type": "audio", "audio": {"id": final_audio_id}}) |
| if audio_url: |
| return send_message(recipient_id, {"type": "audio", "audio": {"link": audio_url}}) |
| logger.error("send_audio_message: need audio_id, audio_url, or audio_path") |
| return False |
|
|
|
|
| def get_message_details(data: Dict[str, Any]) -> Optional[Dict[str, Any]]: |
| try: |
| if "entry" not in data or not data["entry"]: |
| return None |
| change = data["entry"][0].get("changes", [{}])[0] |
| if change.get("field") != "messages": |
| return None |
| msg = change.get("value", {}).get("messages", [{}])[0] |
| if not msg: |
| return None |
|
|
| details = { |
| "type": msg.get("type"), |
| "from": msg.get("from"), |
| "id": msg.get("id"), |
| "timestamp": msg.get("timestamp"), |
| } |
|
|
| if details["type"] == "text": |
| details["text"] = msg.get("text", {}).get("body") |
| elif details["type"] == "audio": |
| details["audio_id"] = msg.get("audio", {}).get("id") |
| elif details["type"] == "image": |
| image_body = msg.get("image", {}) |
| details["image_id"] = image_body.get("id") |
| details["caption"] = image_body.get("caption") |
| elif details["type"] == "interactive": |
| itype = msg.get("interactive", {}).get("type") |
| details["interactive_type"] = itype |
| if itype == "button_reply": |
| br = msg["interactive"]["button_reply"] |
| details["button_reply_id"] = br.get("id") |
| details["button_reply_title"] = br.get("title") |
|
|
| if not all([details.get("type"), details.get("from"), details.get("id")]): |
| logger.warning(f"Incomplete message details: {details}") |
| return None |
|
|
| return details |
|
|
| except Exception as e: |
| logger.error(f"Error parsing webhook data: {e}\nPayload: {json.dumps(data)}") |
| return None |