smart-w / whatsapp_client.py
rairo's picture
Update whatsapp_client.py
4101f38 verified
import requests
import os
import logging
from typing import Optional, Dict, Any
import json
import socket
import threading
import certifi
logger = logging.getLogger(__name__)
WHATSAPP_API_VERSION = os.getenv("WHATSAPP_API_VERSION", "v22.0")
WHATSAPP_TOKEN = os.environ["whatsapp_token"]
PHONE_NUMBER_ID = os.environ["phone_number_id"]
BASE_URL = f"https://graph.facebook.com/{WHATSAPP_API_VERSION}/{PHONE_NUMBER_ID}"
HEADERS = {
"Authorization": f"Bearer {WHATSAPP_TOKEN}",
"Content-Type": "application/json",
}
# Cloudflare Worker relay URL β€” set via WHATSAPP_PROXY_URL env var.
# When set, ALL outbound WhatsApp API calls go through the worker,
# which has no issue reaching graph.facebook.com.
_PROXY_URL = os.getenv("WHATSAPP_PROXY_URL", "").rstrip("/")
# ---------------------------------------------------------------------------
# Socket-level DNS patch β€” only needed for graph.facebook.com when no proxy.
# When using the CF worker relay, HF never contacts Meta IPs directly.
# ---------------------------------------------------------------------------
_dns_overrides: dict = {}
_dns_lock = threading.Lock()
_orig_getaddrinfo = socket.getaddrinfo
def _patched_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
with _dns_lock:
override_ip = _dns_overrides.get(host)
if override_ip:
logger.debug(f"DNS override: {host} -> {override_ip}")
return _orig_getaddrinfo(override_ip, port, family, type, proto, flags)
return _orig_getaddrinfo(host, port, family, type, proto, flags)
socket.getaddrinfo = _patched_getaddrinfo
def set_db(db_client) -> None:
"""No-op kept for compatibility β€” outbox pattern replaced by CF relay."""
pass
def configure_session(config) -> None:
"""Register socket-level DNS overrides (used only as fallback)."""
if not isinstance(config, dict):
return
with _dns_lock:
_dns_overrides.update(config)
logger.info(f"configure_session: DNS overrides registered: {config}")
# ---------------------------------------------------------------------------
# Shared requests session
# ---------------------------------------------------------------------------
_session = requests.Session()
_session.headers.update(HEADERS)
_session.verify = certifi.where()
_session.headers.update({"Connection": "close"})
# ---------------------------------------------------------------------------
# Core send β€” routes through CF worker if WHATSAPP_PROXY_URL is set
# ---------------------------------------------------------------------------
def send_message(recipient_id: str, message_data: Dict[str, Any]) -> bool:
"""
Send a WhatsApp message.
If WHATSAPP_PROXY_URL is set, calls the Cloudflare Worker /send endpoint.
The worker then calls graph.facebook.com β€” bypassing HF's outbound block.
Falls back to direct API call if no proxy is configured.
"""
if _PROXY_URL:
return _send_via_proxy(recipient_id, message_data)
return _send_direct(recipient_id, message_data)
def _send_via_proxy(recipient_id: str, message_data: Dict[str, Any]) -> bool:
"""POST to Cloudflare Worker /send β€” worker relays to graph.facebook.com."""
url = f"{_PROXY_URL}/send"
try:
# Use _session (not requests.post) so the socket-level DNS patch
# applies β€” HF cannot resolve .workers.dev via system DNS.
resp = _session.post(
url,
json={"recipient_id": recipient_id, "message_data": message_data},
timeout=30,
headers={"Content-Type": "application/json"},
)
resp.raise_for_status()
logger.info(f"Message relayed via CF worker to {recipient_id}: {resp.json()}")
return True
except requests.exceptions.RequestException as e:
logger.error(f"CF relay failed for {recipient_id}: {e} β€” trying direct send")
if hasattr(e, "response") and e.response is not None:
logger.error(f"Status: {e.response.status_code} Body: {e.response.text}")
return _send_direct(recipient_id, message_data)
except Exception as e:
logger.error(f"Unexpected error in CF relay for {recipient_id}: {e}")
return _send_direct(recipient_id, message_data)
def _send_direct(recipient_id: str, message_data: Dict[str, Any]) -> bool:
"""Send directly via WhatsApp Cloud API (fallback / non-HF environments)."""
url = f"{BASE_URL}/messages"
payload = {
"messaging_product": "whatsapp",
"recipient_type": "individual",
"to": recipient_id,
**message_data,
}
try:
resp = _session.post(url, json=payload)
resp.raise_for_status()
logger.info(f"Message sent directly to {recipient_id}: {resp.json()}")
return True
except requests.exceptions.RequestException as e:
logger.error(f"Error sending message to {recipient_id}: {e}")
if hasattr(e, "response") and e.response is not None:
logger.error(f"Status: {e.response.status_code} Body: {e.response.text}")
return False
except Exception as e:
logger.error(f"Unexpected error sending message: {e}")
return False
# ---------------------------------------------------------------------------
# Remaining API helpers β€” unchanged
# ---------------------------------------------------------------------------
def send_text_message(recipient_id: str, text: str) -> bool:
return send_message(recipient_id, {
"type": "text",
"text": {"preview_url": False, "body": text},
})
def send_image_message(recipient_id: str,
image_url: Optional[str] = None,
image_id: Optional[str] = None) -> bool:
if not (image_url or image_id):
logger.error("send_image_message: need image_url or image_id")
return False
img = {"id": image_id} if image_id else {"link": image_url}
return send_message(recipient_id, {"type": "image", "image": img})
def send_reply_buttons(recipient_id: str, body_text: str, button_data: list) -> bool:
valid_buttons = []
for btn in button_data[:3]:
reply = btn.get("reply", {})
btn_id = str(reply.get("id", "")).strip()[:256]
btn_title = str(reply.get("title", "")).strip()[:20]
if btn_id and btn_title:
valid_buttons.append({"type": "reply", "reply": {"id": btn_id, "title": btn_title}})
else:
logger.warning(f"Skipping invalid button: {btn}")
if not valid_buttons:
logger.error("No valid buttons for interactive message.")
return False
return send_message(recipient_id, {
"type": "interactive",
"interactive": {
"type": "button",
"body": {"text": str(body_text)[:1024]},
"action": {"buttons": valid_buttons},
},
})
def get_media_url(media_id: str) -> Optional[str]:
url = f"https://graph.facebook.com/{WHATSAPP_API_VERSION}/{media_id}"
try:
resp = _session.get(url)
resp.raise_for_status()
data = resp.json()
logger.info(f"Media URL for {media_id}: {data.get('url')}")
return data.get("url")
except requests.exceptions.RequestException as e:
logger.error(f"Error getting media URL {media_id}: {e}")
if hasattr(e, "response") and e.response is not None:
logger.error(f"Status: {e.response.status_code} Body: {e.response.text}")
return None
except Exception as e:
logger.error(f"Unexpected error getting media URL: {e}")
return None
def download_media(media_url: str, save_path: str) -> Optional[str]:
try:
resp = _session.get(media_url, stream=True)
resp.raise_for_status()
os.makedirs(os.path.dirname(save_path), exist_ok=True)
with open(save_path, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
f.write(chunk)
logger.info(f"Media saved to {save_path}")
return save_path
except requests.exceptions.RequestException as e:
logger.error(f"Error downloading from {media_url}: {e}")
if hasattr(e, "response") and e.response is not None:
logger.error(f"Status: {e.response.status_code} Body: {e.response.text}")
return None
except Exception as e:
logger.error(f"Unexpected error downloading media: {e}")
return None
def upload_media(file_path: str, mime_type: str = "audio/mpeg") -> Optional[str]:
url = f"{BASE_URL}/media"
try:
if not os.path.isfile(file_path):
logger.error(f"upload_media: file not found: {file_path}")
return None
with open(file_path, "rb") as fh:
files = {"file": (os.path.basename(file_path), fh, mime_type)}
data = {"messaging_product": "whatsapp", "type": mime_type}
resp = _session.post(url, files=files, data=data)
resp.raise_for_status()
media_id = resp.json().get("id")
logger.info(f"upload_media: uploaded {file_path} -> media_id={media_id}")
return media_id
except Exception as e:
logger.error(f"upload_media: failed: {e}", exc_info=True)
return None
def send_audio_message(recipient_id: str,
audio_path: Optional[str] = None,
audio_url: Optional[str] = None,
audio_id: Optional[str] = None) -> bool:
final_audio_id = audio_id
if not final_audio_id and audio_path:
final_audio_id = upload_media(audio_path, mime_type="audio/mpeg")
if final_audio_id:
return send_message(recipient_id, {"type": "audio", "audio": {"id": final_audio_id}})
if audio_url:
return send_message(recipient_id, {"type": "audio", "audio": {"link": audio_url}})
logger.error("send_audio_message: need audio_id, audio_url, or audio_path")
return False
def get_message_details(data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
try:
if "entry" not in data or not data["entry"]:
return None
change = data["entry"][0].get("changes", [{}])[0]
if change.get("field") != "messages":
return None
msg = change.get("value", {}).get("messages", [{}])[0]
if not msg:
return None
details = {
"type": msg.get("type"),
"from": msg.get("from"),
"id": msg.get("id"),
"timestamp": msg.get("timestamp"),
}
if details["type"] == "text":
details["text"] = msg.get("text", {}).get("body")
elif details["type"] == "audio":
details["audio_id"] = msg.get("audio", {}).get("id")
elif details["type"] == "image":
image_body = msg.get("image", {})
details["image_id"] = image_body.get("id")
details["caption"] = image_body.get("caption")
elif details["type"] == "interactive":
itype = msg.get("interactive", {}).get("type")
details["interactive_type"] = itype
if itype == "button_reply":
br = msg["interactive"]["button_reply"]
details["button_reply_id"] = br.get("id")
details["button_reply_title"] = br.get("title")
if not all([details.get("type"), details.get("from"), details.get("id")]):
logger.warning(f"Incomplete message details: {details}")
return None
return details
except Exception as e:
logger.error(f"Error parsing webhook data: {e}\nPayload: {json.dumps(data)}")
return None