from pathlib import Path from typing import Any from datetime import datetime, timedelta, timezone import httpx from ..config import get_settings from ..models import PageContext, PublishMode from .token_resolver import get_page_credentials class MetaClientError(RuntimeError): pass class MetaClient: def __init__(self) -> None: self.settings = get_settings() self.base_url = f"https://graph.facebook.com/{self.settings.meta_graph_version}" self._page_access_token_cache: dict[PageContext, str] = {} async def _page_access_token(self, page_context: PageContext) -> str: creds = get_page_credentials(page_context) cached = self._page_access_token_cache.get(page_context) if cached: return cached url = f"{self.base_url}/{creds.page_id}" async with httpx.AsyncClient(timeout=20.0) as client: response = await client.get( url, headers={"Authorization": f"Bearer {creds.page_token}"}, params={"fields": "access_token"}, ) try: body = response.json() except Exception: body = {} token = body.get("access_token") if response.status_code < 400 else None if token: self._page_access_token_cache[page_context] = token return token # Backward-compatible fallback for environments that still provide Page tokens directly. return creds.page_token async def _post(self, page_context: PageContext, path: str, json_payload: dict[str, Any] | None = None, data_payload: dict[str, Any] | None = None, files: dict[str, Any] | None = None) -> dict[str, Any]: page_access_token = await self._page_access_token(page_context) url = f"{self.base_url}/{path.lstrip('/')}" async with httpx.AsyncClient(timeout=20.0) as client: response = await client.post(url, headers={"Authorization": f"Bearer {page_access_token}"}, json=json_payload, data=data_payload, files=files) try: body = response.json() except Exception: body = {"raw": response.text} if response.status_code >= 400: raise MetaClientError(f"Meta API error {response.status_code} for {path}: {body}") return body def _scheduled_timestamp(self, scheduled_publish_time: datetime | None) -> int: if scheduled_publish_time is None: raise MetaClientError("scheduled_publish_time is required for scheduled Page posts") scheduled = scheduled_publish_time if scheduled.tzinfo is None: scheduled = scheduled.replace(tzinfo=timezone.utc) scheduled = scheduled.astimezone(timezone.utc) now = datetime.now(timezone.utc) if scheduled < now + timedelta(minutes=10): raise MetaClientError("scheduled_publish_time must be at least 10 minutes in the future") if scheduled > now + timedelta(days=75): raise MetaClientError("scheduled_publish_time must be within 75 days") return int(scheduled.timestamp()) def build_page_post_payload(self, message: str, publish_mode: PublishMode = PublishMode.now, scheduled_publish_time: datetime | None = None) -> dict[str, Any]: payload: dict[str, Any] = {"message": message} if publish_mode == PublishMode.scheduled: payload["published"] = "false" payload["scheduled_publish_time"] = str(self._scheduled_timestamp(scheduled_publish_time)) elif publish_mode == PublishMode.validation_only: if scheduled_publish_time: payload["published"] = "false" payload["scheduled_publish_time"] = str(self._scheduled_timestamp(scheduled_publish_time)) else: payload["published"] = "true" return payload async def _get(self, page_context: PageContext, path: str, params: dict[str, Any] | None = None) -> dict[str, Any]: page_access_token = await self._page_access_token(page_context) url = f"{self.base_url}/{path.lstrip('/')}" async with httpx.AsyncClient(timeout=20.0) as client: response = await client.get(url, headers={"Authorization": f"Bearer {page_access_token}"}, params=params or {}) try: body = response.json() except Exception: body = {"raw": response.text} if response.status_code >= 400: raise MetaClientError(f"Meta API error {response.status_code} for {path}: {body}") return body async def _delete(self, page_context: PageContext, path: str, params: dict[str, Any] | None = None) -> dict[str, Any]: page_access_token = await self._page_access_token(page_context) url = f"{self.base_url}/{path.lstrip('/')}" async with httpx.AsyncClient(timeout=20.0) as client: response = await client.delete(url, headers={"Authorization": f"Bearer {page_access_token}"}, params=params or {}) try: body = response.json() except Exception: body = {"raw": response.text} if response.status_code >= 400: raise MetaClientError(f"Meta API error {response.status_code} for {path}: {body}") return body async def list_page_edge(self, page_context: PageContext, edge: str, fields: str, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]: creds = get_page_credentials(page_context) params: dict[str, Any] = {"fields": fields, "limit": limit} if after: params["after"] = after if before: params["before"] = before return await self._get(page_context, f"/{creds.page_id}/{edge}", params=params) async def list_page_posts(self, page_context: PageContext, edge: str = "posts", limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]: allowed_edges = {"posts", "feed", "published_posts", "scheduled_posts"} if edge not in allowed_edges: raise MetaClientError(f"Unsupported Page post edge: {edge}") fields = "id,message,created_time,updated_time,permalink_url,status_type,is_published,scheduled_publish_time,attachments{media_type,title,url,target}" return await self.list_page_edge(page_context, edge, fields, limit, after, before) async def list_page_media(self, page_context: PageContext, media_type: str = "all", limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]: edge_fields = { "photos": "id,name,created_time,updated_time,link,images,album", "videos": "id,title,description,created_time,updated_time,permalink_url,status", "video_reels": "id,title,description,created_time,updated_time,permalink_url,status", } requested = list(edge_fields) if media_type == "all" else [media_type] if any(edge not in edge_fields for edge in requested): raise MetaClientError(f"Unsupported media_type: {media_type}") results: dict[str, Any] = {} for edge in requested: results[edge] = await self.list_page_edge(page_context, edge, edge_fields[edge], limit, after, before) return {"media": results} async def list_conversations(self, page_context: PageContext, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]: fields = "id,updated_time,link,message_count,unread_count,participants" return await self.list_page_edge(page_context, "conversations", fields, limit, after, before) async def get_page_insights(self, page_context: PageContext, metrics: list[str], period: str = "day", since: str | None = None, until: str | None = None) -> dict[str, Any]: creds = get_page_credentials(page_context) params: dict[str, Any] = {"metric": ",".join(metrics), "period": period} if since: params["since"] = since if until: params["until"] = until return await self._get(page_context, f"/{creds.page_id}/insights", params=params) async def list_webhook_subscriptions(self, page_context: PageContext, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]: fields = "id,name,subscribed_fields,category" return await self.list_page_edge(page_context, "subscribed_apps", fields, limit, after, before) async def list_page_settings(self, page_context: PageContext, limit: int = 25, after: str | None = None, before: str | None = None) -> dict[str, Any]: fields = "setting,description,value" return await self.list_page_edge(page_context, "settings", fields, limit, after, before) async def publish_page_post(self, page_context: PageContext, message: str, publish_mode: PublishMode = PublishMode.now, scheduled_publish_time: datetime | None = None) -> dict[str, Any]: creds = get_page_credentials(page_context) payload = self.build_page_post_payload(message, publish_mode, scheduled_publish_time) if publish_mode == PublishMode.validation_only: return {"publish_attempted": False, "graph_path": f"/{creds.page_id}/feed", "payload": payload} return await self._post(page_context, f"/{creds.page_id}/feed", data_payload=payload) async def publish_page_photo_post(self, page_context: PageContext, message: str, media: dict[str, Any]) -> dict[str, Any]: creds = get_page_credentials(page_context) path = Path(media["media_path"]) with path.open("rb") as fh: files = {"source": (path.name, fh, media.get("mime_type") or "application/octet-stream")} data = {"caption": message, "published": "true"} if media.get("alt_text"): data["alt_text_custom"] = str(media["alt_text"]) return await self._post(page_context, f"/{creds.page_id}/photos", data_payload=data, files=files) async def publish_page_post_with_media(self, page_context: PageContext, message: str, media_attachments: list[dict[str, Any]], publish_mode: PublishMode = PublishMode.now, scheduled_publish_time: datetime | None = None) -> dict[str, Any]: if not media_attachments: return await self.publish_page_post(page_context, message, publish_mode, scheduled_publish_time) if publish_mode == PublishMode.validation_only: return {"publish_attempted": False, "media_count": len(media_attachments), "media_mode": "page_photo", "note": "single-image Page photo publish would be used for now-mode media posts"} if publish_mode == PublishMode.scheduled: raise MetaClientError("Scheduled Page posts with media are not enabled yet; use validation_only or now") if len(media_attachments) == 1 and media_attachments[0].get("media_type") == "image": return await self.publish_page_photo_post(page_context, message, media_attachments[0]) raise MetaClientError("Only single-image Page posts are currently supported for direct media publishing") async def publish_reel(self, page_context: PageContext, video: dict[str, Any], description: str, title: str | None = None) -> dict[str, Any]: creds = get_page_credentials(page_context) start = await self._post(page_context, f"/{creds.page_id}/video_reels", data_payload={"upload_phase": "start"}) video_id = start.get("video_id") upload_url = start.get("upload_url") if not video_id or not upload_url: raise MetaClientError(f"Meta Reels start response missing upload fields: {start}") page_access_token = await self._page_access_token(page_context) path = Path(video["media_path"]) async with httpx.AsyncClient(timeout=300.0) as client: with path.open("rb") as fh: upload_response = await client.post( upload_url, headers={ "Authorization": f"OAuth {page_access_token}", "file_size": str(path.stat().st_size), }, content=fh, ) try: upload_body = upload_response.json() except Exception: upload_body = {"raw": upload_response.text} if upload_response.status_code >= 400: raise MetaClientError(f"Meta Reels upload error {upload_response.status_code}: {upload_body}") data = {"upload_phase": "finish", "video_id": video_id, "description": description} if title: data["title"] = title finish = await self._post(page_context, f"/{creds.page_id}/video_reels", data_payload=data) return {"video_id": video_id, "start": start, "upload": upload_body, "finish": finish} async def reply_to_comment(self, page_context: PageContext, comment_id: str, message: str) -> dict[str, Any]: return await self._post(page_context, f"/{comment_id}/comments", data_payload={"message": message}) async def read_comment_thread(self, page_context: PageContext, object_id: str, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]: params: dict[str, Any] = {"fields": "id,message,created_time,from,can_comment,can_hide,can_like,comment_count,is_hidden,parent{id}", "limit": limit} if after: params["after"] = after if before: params["before"] = before return await self._get(page_context, f"/{object_id}/comments", params=params) async def moderate_comment(self, page_context: PageContext, comment_id: str, action: str) -> dict[str, Any]: if action == "hide": return await self._post(page_context, f"/{comment_id}", data_payload={"is_hidden": "true"}) if action == "unhide": return await self._post(page_context, f"/{comment_id}", data_payload={"is_hidden": "false"}) if action == "delete": return await self._delete(page_context, f"/{comment_id}") raise MetaClientError(f"Unsupported comment moderation action: {action}") async def send_messenger_message(self, page_context: PageContext, recipient_psid: str, message: str, messaging_type: str = "RESPONSE") -> dict[str, Any]: creds = get_page_credentials(page_context) payload = {"recipient": {"id": recipient_psid}, "messaging_type": messaging_type, "message": {"text": message}} return await self._post(page_context, f"/{creds.page_id}/messages", json_payload=payload) async def read_conversation_detail(self, page_context: PageContext, conversation_id: str, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]: message_fields = "id,message,created_time,from,to" params: dict[str, Any] = {"fields": f"id,updated_time,link,message_count,unread_count,participants,messages.limit({limit}){{{message_fields}}}"} if after: params["after"] = after if before: params["before"] = before return await self._get(page_context, f"/{conversation_id}", params=params) async def messenger_response_window(self, page_context: PageContext, conversation_id: str) -> dict[str, Any]: detail = await self.read_conversation_detail(page_context, conversation_id, limit=5) messages = detail.get("messages", {}).get("data", []) if isinstance(detail, dict) else [] newest = messages[0] if messages else None return { "conversation_id": conversation_id, "latest_message_time": newest.get("created_time") if isinstance(newest, dict) else None, "message_count": detail.get("message_count") if isinstance(detail, dict) else None, "response_window_status": "inspect_latest_message_time", "note": "Policy-window eligibility depends on the latest inbound user message and allowed messaging_type/tag; this endpoint is read-only.", }