| import hashlib |
| from typing import Any |
| from ..config import get_settings |
| from ..models import PageContext, Channel |
| from ..schemas import NormalizedInboundEvent |
|
|
| def hash_sender(sender_id: str | None) -> str | None: |
| return hashlib.sha256(sender_id.encode("utf-8")).hexdigest() if sender_id else None |
|
|
| def resolve_page_context(platform_page_id: str | None) -> PageContext: |
| settings = get_settings() |
| page_id = str(platform_page_id or "") |
| for slug in settings.page_contexts(): |
| if page_id == settings.page_env(slug, "id"): |
| return PageContext(slug) |
| raise ValueError(f"Unrecognized platform_page_id: {platform_page_id}") |
|
|
| def normalize_meta_payload(payload: dict[str, Any]) -> list[NormalizedInboundEvent]: |
| events: list[NormalizedInboundEvent] = [] |
| for entry in payload.get("entry", []) or []: |
| page_id = str(entry.get("id", "")) |
| page_context = resolve_page_context(page_id) |
| for messaging in entry.get("messaging", []) or []: |
| sender_id = messaging.get("sender", {}).get("id") |
| recipient_id = messaging.get("recipient", {}).get("id") |
| message = messaging.get("message") or {} |
| postback = messaging.get("postback") or {} |
| if message: |
| events.append(NormalizedInboundEvent(page_context=page_context, platform_page_id=page_id or recipient_id, channel=Channel.messenger, event_type="message", sender_id_hash=hash_sender(sender_id), object_id=message.get("mid"), message_text=message.get("text"), normalized_payload={"sender_psid": sender_id, "recipient_id": recipient_id, "message": message})) |
| if postback: |
| events.append(NormalizedInboundEvent(page_context=page_context, platform_page_id=page_id or recipient_id, channel=Channel.messenger, event_type="postback", sender_id_hash=hash_sender(sender_id), object_id=postback.get("mid") or postback.get("payload"), message_text=postback.get("title"), normalized_payload={"sender_psid": sender_id, "recipient_id": recipient_id, "postback": postback})) |
| for change in entry.get("changes", []) or []: |
| field = change.get("field") |
| value = change.get("value") or {} |
| if field in {"feed", "comments", "mention"}: |
| item = value.get("item") |
| comment_id = value.get("comment_id") |
| post_id = value.get("post_id") |
| channel = Channel.comment if item == "comment" or comment_id else Channel.page_post |
| verb = value.get("verb") or "change" |
| events.append(NormalizedInboundEvent(page_context=page_context, platform_page_id=page_id, channel=channel, event_type=f"{field}:{verb}", sender_id_hash=hash_sender((value.get("from") or {}).get("id")), object_id=comment_id or post_id or value.get("id"), parent_id=post_id, message_text=value.get("message") or value.get("comment_message"), normalized_payload=value)) |
| if field == "leadgen": |
| events.append(NormalizedInboundEvent(page_context=page_context, platform_page_id=page_id, channel=Channel.lead, event_type="leadgen", object_id=value.get("leadgen_id"), normalized_payload=value)) |
| return events |
|
|