File size: 3,192 Bytes
0e84a1f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | import hashlib
from typing import Any
from ..config import get_settings
from ..models import PageContext, Channel
from ..schemas import NormalizedInboundEvent
def hash_sender(sender_id: str | None) -> str | None:
return hashlib.sha256(sender_id.encode("utf-8")).hexdigest() if sender_id else None
def resolve_page_context(platform_page_id: str | None) -> PageContext:
settings = get_settings()
page_id = str(platform_page_id or "")
for slug in settings.page_contexts():
if page_id == settings.page_env(slug, "id"):
return PageContext(slug)
raise ValueError(f"Unrecognized platform_page_id: {platform_page_id}")
def normalize_meta_payload(payload: dict[str, Any]) -> list[NormalizedInboundEvent]:
events: list[NormalizedInboundEvent] = []
for entry in payload.get("entry", []) or []:
page_id = str(entry.get("id", ""))
page_context = resolve_page_context(page_id)
for messaging in entry.get("messaging", []) or []:
sender_id = messaging.get("sender", {}).get("id")
recipient_id = messaging.get("recipient", {}).get("id")
message = messaging.get("message") or {}
postback = messaging.get("postback") or {}
if message:
events.append(NormalizedInboundEvent(page_context=page_context, platform_page_id=page_id or recipient_id, channel=Channel.messenger, event_type="message", sender_id_hash=hash_sender(sender_id), object_id=message.get("mid"), message_text=message.get("text"), normalized_payload={"sender_psid": sender_id, "recipient_id": recipient_id, "message": message}))
if postback:
events.append(NormalizedInboundEvent(page_context=page_context, platform_page_id=page_id or recipient_id, channel=Channel.messenger, event_type="postback", sender_id_hash=hash_sender(sender_id), object_id=postback.get("mid") or postback.get("payload"), message_text=postback.get("title"), normalized_payload={"sender_psid": sender_id, "recipient_id": recipient_id, "postback": postback}))
for change in entry.get("changes", []) or []:
field = change.get("field")
value = change.get("value") or {}
if field in {"feed", "comments", "mention"}:
item = value.get("item")
comment_id = value.get("comment_id")
post_id = value.get("post_id")
channel = Channel.comment if item == "comment" or comment_id else Channel.page_post
verb = value.get("verb") or "change"
events.append(NormalizedInboundEvent(page_context=page_context, platform_page_id=page_id, channel=channel, event_type=f"{field}:{verb}", sender_id_hash=hash_sender((value.get("from") or {}).get("id")), object_id=comment_id or post_id or value.get("id"), parent_id=post_id, message_text=value.get("message") or value.get("comment_message"), normalized_payload=value))
if field == "leadgen":
events.append(NormalizedInboundEvent(page_context=page_context, platform_page_id=page_id, channel=Channel.lead, event_type="leadgen", object_id=value.get("leadgen_id"), normalized_payload=value))
return events
|