| from pathlib import Path |
| from typing import Any |
| from datetime import datetime, timedelta, timezone |
| import httpx |
| from ..config import get_settings |
| from ..models import PageContext, PublishMode |
| from .token_resolver import get_page_credentials |
|
|
| class MetaClientError(RuntimeError): |
| pass |
|
|
| class MetaClient: |
| def __init__(self) -> None: |
| self.settings = get_settings() |
| self.base_url = f"https://graph.facebook.com/{self.settings.meta_graph_version}" |
| self._page_access_token_cache: dict[PageContext, str] = {} |
|
|
| async def _page_access_token(self, page_context: PageContext) -> str: |
| creds = get_page_credentials(page_context) |
| cached = self._page_access_token_cache.get(page_context) |
| if cached: |
| return cached |
| url = f"{self.base_url}/{creds.page_id}" |
| async with httpx.AsyncClient(timeout=20.0) as client: |
| response = await client.get( |
| url, |
| headers={"Authorization": f"Bearer {creds.page_token}"}, |
| params={"fields": "access_token"}, |
| ) |
| try: |
| body = response.json() |
| except Exception: |
| body = {} |
| token = body.get("access_token") if response.status_code < 400 else None |
| if token: |
| self._page_access_token_cache[page_context] = token |
| return token |
| |
| return creds.page_token |
|
|
| async def _post(self, page_context: PageContext, path: str, json_payload: dict[str, Any] | None = None, data_payload: dict[str, Any] | None = None, files: dict[str, Any] | None = None) -> dict[str, Any]: |
| page_access_token = await self._page_access_token(page_context) |
| url = f"{self.base_url}/{path.lstrip('/')}" |
| async with httpx.AsyncClient(timeout=20.0) as client: |
| response = await client.post(url, headers={"Authorization": f"Bearer {page_access_token}"}, json=json_payload, data=data_payload, files=files) |
| try: |
| body = response.json() |
| except Exception: |
| body = {"raw": response.text} |
| if response.status_code >= 400: |
| raise MetaClientError(f"Meta API error {response.status_code} for {path}: {body}") |
| return body |
|
|
| def _scheduled_timestamp(self, scheduled_publish_time: datetime | None) -> int: |
| if scheduled_publish_time is None: |
| raise MetaClientError("scheduled_publish_time is required for scheduled Page posts") |
| scheduled = scheduled_publish_time |
| if scheduled.tzinfo is None: |
| scheduled = scheduled.replace(tzinfo=timezone.utc) |
| scheduled = scheduled.astimezone(timezone.utc) |
| now = datetime.now(timezone.utc) |
| if scheduled < now + timedelta(minutes=10): |
| raise MetaClientError("scheduled_publish_time must be at least 10 minutes in the future") |
| if scheduled > now + timedelta(days=75): |
| raise MetaClientError("scheduled_publish_time must be within 75 days") |
| return int(scheduled.timestamp()) |
|
|
| def build_page_post_payload(self, message: str, publish_mode: PublishMode = PublishMode.now, scheduled_publish_time: datetime | None = None) -> dict[str, Any]: |
| payload: dict[str, Any] = {"message": message} |
| if publish_mode == PublishMode.scheduled: |
| payload["published"] = "false" |
| payload["scheduled_publish_time"] = str(self._scheduled_timestamp(scheduled_publish_time)) |
| elif publish_mode == PublishMode.validation_only: |
| if scheduled_publish_time: |
| payload["published"] = "false" |
| payload["scheduled_publish_time"] = str(self._scheduled_timestamp(scheduled_publish_time)) |
| else: |
| payload["published"] = "true" |
| return payload |
|
|
| async def _get(self, page_context: PageContext, path: str, params: dict[str, Any] | None = None) -> dict[str, Any]: |
| page_access_token = await self._page_access_token(page_context) |
| url = f"{self.base_url}/{path.lstrip('/')}" |
| async with httpx.AsyncClient(timeout=20.0) as client: |
| response = await client.get(url, headers={"Authorization": f"Bearer {page_access_token}"}, params=params or {}) |
| try: |
| body = response.json() |
| except Exception: |
| body = {"raw": response.text} |
| if response.status_code >= 400: |
| raise MetaClientError(f"Meta API error {response.status_code} for {path}: {body}") |
| return body |
|
|
| async def _delete(self, page_context: PageContext, path: str, params: dict[str, Any] | None = None) -> dict[str, Any]: |
| page_access_token = await self._page_access_token(page_context) |
| url = f"{self.base_url}/{path.lstrip('/')}" |
| async with httpx.AsyncClient(timeout=20.0) as client: |
| response = await client.delete(url, headers={"Authorization": f"Bearer {page_access_token}"}, params=params or {}) |
| try: |
| body = response.json() |
| except Exception: |
| body = {"raw": response.text} |
| if response.status_code >= 400: |
| raise MetaClientError(f"Meta API error {response.status_code} for {path}: {body}") |
| return body |
|
|
| async def list_page_edge(self, page_context: PageContext, edge: str, fields: str, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]: |
| creds = get_page_credentials(page_context) |
| params: dict[str, Any] = {"fields": fields, "limit": limit} |
| if after: |
| params["after"] = after |
| if before: |
| params["before"] = before |
| return await self._get(page_context, f"/{creds.page_id}/{edge}", params=params) |
|
|
| async def list_page_posts(self, page_context: PageContext, edge: str = "posts", limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]: |
| allowed_edges = {"posts", "feed", "published_posts", "scheduled_posts"} |
| if edge not in allowed_edges: |
| raise MetaClientError(f"Unsupported Page post edge: {edge}") |
| fields = "id,message,created_time,updated_time,permalink_url,status_type,is_published,scheduled_publish_time,attachments{media_type,title,url,target}" |
| return await self.list_page_edge(page_context, edge, fields, limit, after, before) |
|
|
| async def list_page_media(self, page_context: PageContext, media_type: str = "all", limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]: |
| edge_fields = { |
| "photos": "id,name,created_time,updated_time,link,images,album", |
| "videos": "id,title,description,created_time,updated_time,permalink_url,status", |
| "video_reels": "id,title,description,created_time,updated_time,permalink_url,status", |
| } |
| requested = list(edge_fields) if media_type == "all" else [media_type] |
| if any(edge not in edge_fields for edge in requested): |
| raise MetaClientError(f"Unsupported media_type: {media_type}") |
| results: dict[str, Any] = {} |
| for edge in requested: |
| results[edge] = await self.list_page_edge(page_context, edge, edge_fields[edge], limit, after, before) |
| return {"media": results} |
|
|
| async def list_conversations(self, page_context: PageContext, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]: |
| fields = "id,updated_time,link,message_count,unread_count,participants" |
| return await self.list_page_edge(page_context, "conversations", fields, limit, after, before) |
|
|
| async def get_page_insights(self, page_context: PageContext, metrics: list[str], period: str = "day", since: str | None = None, until: str | None = None) -> dict[str, Any]: |
| creds = get_page_credentials(page_context) |
| params: dict[str, Any] = {"metric": ",".join(metrics), "period": period} |
| if since: |
| params["since"] = since |
| if until: |
| params["until"] = until |
| return await self._get(page_context, f"/{creds.page_id}/insights", params=params) |
|
|
| async def list_webhook_subscriptions(self, page_context: PageContext, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]: |
| fields = "id,name,subscribed_fields,category" |
| return await self.list_page_edge(page_context, "subscribed_apps", fields, limit, after, before) |
|
|
| async def list_page_settings(self, page_context: PageContext, limit: int = 25, after: str | None = None, before: str | None = None) -> dict[str, Any]: |
| fields = "setting,description,value" |
| return await self.list_page_edge(page_context, "settings", fields, limit, after, before) |
|
|
| async def publish_page_post(self, page_context: PageContext, message: str, publish_mode: PublishMode = PublishMode.now, scheduled_publish_time: datetime | None = None) -> dict[str, Any]: |
| creds = get_page_credentials(page_context) |
| payload = self.build_page_post_payload(message, publish_mode, scheduled_publish_time) |
| if publish_mode == PublishMode.validation_only: |
| return {"publish_attempted": False, "graph_path": f"/{creds.page_id}/feed", "payload": payload} |
| return await self._post(page_context, f"/{creds.page_id}/feed", data_payload=payload) |
|
|
| async def publish_page_photo_post(self, page_context: PageContext, message: str, media: dict[str, Any]) -> dict[str, Any]: |
| creds = get_page_credentials(page_context) |
| path = Path(media["media_path"]) |
| with path.open("rb") as fh: |
| files = {"source": (path.name, fh, media.get("mime_type") or "application/octet-stream")} |
| data = {"caption": message, "published": "true"} |
| if media.get("alt_text"): |
| data["alt_text_custom"] = str(media["alt_text"]) |
| return await self._post(page_context, f"/{creds.page_id}/photos", data_payload=data, files=files) |
|
|
| async def publish_page_post_with_media(self, page_context: PageContext, message: str, media_attachments: list[dict[str, Any]], publish_mode: PublishMode = PublishMode.now, scheduled_publish_time: datetime | None = None) -> dict[str, Any]: |
| if not media_attachments: |
| return await self.publish_page_post(page_context, message, publish_mode, scheduled_publish_time) |
| if publish_mode == PublishMode.validation_only: |
| return {"publish_attempted": False, "media_count": len(media_attachments), "media_mode": "page_photo", "note": "single-image Page photo publish would be used for now-mode media posts"} |
| if publish_mode == PublishMode.scheduled: |
| raise MetaClientError("Scheduled Page posts with media are not enabled yet; use validation_only or now") |
| if len(media_attachments) == 1 and media_attachments[0].get("media_type") == "image": |
| return await self.publish_page_photo_post(page_context, message, media_attachments[0]) |
| raise MetaClientError("Only single-image Page posts are currently supported for direct media publishing") |
|
|
| async def publish_reel(self, page_context: PageContext, video: dict[str, Any], description: str, title: str | None = None) -> dict[str, Any]: |
| creds = get_page_credentials(page_context) |
| start = await self._post(page_context, f"/{creds.page_id}/video_reels", data_payload={"upload_phase": "start"}) |
| video_id = start.get("video_id") |
| upload_url = start.get("upload_url") |
| if not video_id or not upload_url: |
| raise MetaClientError(f"Meta Reels start response missing upload fields: {start}") |
| page_access_token = await self._page_access_token(page_context) |
| path = Path(video["media_path"]) |
| async with httpx.AsyncClient(timeout=300.0) as client: |
| with path.open("rb") as fh: |
| upload_response = await client.post( |
| upload_url, |
| headers={ |
| "Authorization": f"OAuth {page_access_token}", |
| "file_size": str(path.stat().st_size), |
| }, |
| content=fh, |
| ) |
| try: |
| upload_body = upload_response.json() |
| except Exception: |
| upload_body = {"raw": upload_response.text} |
| if upload_response.status_code >= 400: |
| raise MetaClientError(f"Meta Reels upload error {upload_response.status_code}: {upload_body}") |
| data = {"upload_phase": "finish", "video_id": video_id, "description": description} |
| if title: |
| data["title"] = title |
| finish = await self._post(page_context, f"/{creds.page_id}/video_reels", data_payload=data) |
| return {"video_id": video_id, "start": start, "upload": upload_body, "finish": finish} |
|
|
| async def reply_to_comment(self, page_context: PageContext, comment_id: str, message: str) -> dict[str, Any]: |
| return await self._post(page_context, f"/{comment_id}/comments", data_payload={"message": message}) |
|
|
| async def read_comment_thread(self, page_context: PageContext, object_id: str, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]: |
| params: dict[str, Any] = {"fields": "id,message,created_time,from,can_comment,can_hide,can_like,comment_count,is_hidden,parent{id}", "limit": limit} |
| if after: |
| params["after"] = after |
| if before: |
| params["before"] = before |
| return await self._get(page_context, f"/{object_id}/comments", params=params) |
|
|
| async def moderate_comment(self, page_context: PageContext, comment_id: str, action: str) -> dict[str, Any]: |
| if action == "hide": |
| return await self._post(page_context, f"/{comment_id}", data_payload={"is_hidden": "true"}) |
| if action == "unhide": |
| return await self._post(page_context, f"/{comment_id}", data_payload={"is_hidden": "false"}) |
| if action == "delete": |
| return await self._delete(page_context, f"/{comment_id}") |
| raise MetaClientError(f"Unsupported comment moderation action: {action}") |
|
|
| async def send_messenger_message(self, page_context: PageContext, recipient_psid: str, message: str, messaging_type: str = "RESPONSE") -> dict[str, Any]: |
| creds = get_page_credentials(page_context) |
| payload = {"recipient": {"id": recipient_psid}, "messaging_type": messaging_type, "message": {"text": message}} |
| return await self._post(page_context, f"/{creds.page_id}/messages", json_payload=payload) |
|
|
| async def read_conversation_detail(self, page_context: PageContext, conversation_id: str, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]: |
| message_fields = "id,message,created_time,from,to" |
| params: dict[str, Any] = {"fields": f"id,updated_time,link,message_count,unread_count,participants,messages.limit({limit}){{{message_fields}}}"} |
| if after: |
| params["after"] = after |
| if before: |
| params["before"] = before |
| return await self._get(page_context, f"/{conversation_id}", params=params) |
|
|
| async def messenger_response_window(self, page_context: PageContext, conversation_id: str) -> dict[str, Any]: |
| detail = await self.read_conversation_detail(page_context, conversation_id, limit=5) |
| messages = detail.get("messages", {}).get("data", []) if isinstance(detail, dict) else [] |
| newest = messages[0] if messages else None |
| return { |
| "conversation_id": conversation_id, |
| "latest_message_time": newest.get("created_time") if isinstance(newest, dict) else None, |
| "message_count": detail.get("message_count") if isinstance(detail, dict) else None, |
| "response_window_status": "inspect_latest_message_time", |
| "note": "Policy-window eligibility depends on the latest inbound user message and allowed messaging_type/tag; this endpoint is read-only.", |
| } |
|
|