File size: 16,002 Bytes
0e84a1f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
from pathlib import Path
from typing import Any
from datetime import datetime, timedelta, timezone
import httpx
from ..config import get_settings
from ..models import PageContext, PublishMode
from .token_resolver import get_page_credentials

class MetaClientError(RuntimeError):
    pass

class MetaClient:
    def __init__(self) -> None:
        self.settings = get_settings()
        self.base_url = f"https://graph.facebook.com/{self.settings.meta_graph_version}"
        self._page_access_token_cache: dict[PageContext, str] = {}

    async def _page_access_token(self, page_context: PageContext) -> str:
        creds = get_page_credentials(page_context)
        cached = self._page_access_token_cache.get(page_context)
        if cached:
            return cached
        url = f"{self.base_url}/{creds.page_id}"
        async with httpx.AsyncClient(timeout=20.0) as client:
            response = await client.get(
                url,
                headers={"Authorization": f"Bearer {creds.page_token}"},
                params={"fields": "access_token"},
            )
        try:
            body = response.json()
        except Exception:
            body = {}
        token = body.get("access_token") if response.status_code < 400 else None
        if token:
            self._page_access_token_cache[page_context] = token
            return token
        # Backward-compatible fallback for environments that still provide Page tokens directly.
        return creds.page_token

    async def _post(self, page_context: PageContext, path: str, json_payload: dict[str, Any] | None = None, data_payload: dict[str, Any] | None = None, files: dict[str, Any] | None = None) -> dict[str, Any]:
        page_access_token = await self._page_access_token(page_context)
        url = f"{self.base_url}/{path.lstrip('/')}"
        async with httpx.AsyncClient(timeout=20.0) as client:
            response = await client.post(url, headers={"Authorization": f"Bearer {page_access_token}"}, json=json_payload, data=data_payload, files=files)
        try:
            body = response.json()
        except Exception:
            body = {"raw": response.text}
        if response.status_code >= 400:
            raise MetaClientError(f"Meta API error {response.status_code} for {path}: {body}")
        return body

    def _scheduled_timestamp(self, scheduled_publish_time: datetime | None) -> int:
        if scheduled_publish_time is None:
            raise MetaClientError("scheduled_publish_time is required for scheduled Page posts")
        scheduled = scheduled_publish_time
        if scheduled.tzinfo is None:
            scheduled = scheduled.replace(tzinfo=timezone.utc)
        scheduled = scheduled.astimezone(timezone.utc)
        now = datetime.now(timezone.utc)
        if scheduled < now + timedelta(minutes=10):
            raise MetaClientError("scheduled_publish_time must be at least 10 minutes in the future")
        if scheduled > now + timedelta(days=75):
            raise MetaClientError("scheduled_publish_time must be within 75 days")
        return int(scheduled.timestamp())

    def build_page_post_payload(self, message: str, publish_mode: PublishMode = PublishMode.now, scheduled_publish_time: datetime | None = None) -> dict[str, Any]:
        payload: dict[str, Any] = {"message": message}
        if publish_mode == PublishMode.scheduled:
            payload["published"] = "false"
            payload["scheduled_publish_time"] = str(self._scheduled_timestamp(scheduled_publish_time))
        elif publish_mode == PublishMode.validation_only:
            if scheduled_publish_time:
                payload["published"] = "false"
                payload["scheduled_publish_time"] = str(self._scheduled_timestamp(scheduled_publish_time))
            else:
                payload["published"] = "true"
        return payload

    async def _get(self, page_context: PageContext, path: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
        page_access_token = await self._page_access_token(page_context)
        url = f"{self.base_url}/{path.lstrip('/')}"
        async with httpx.AsyncClient(timeout=20.0) as client:
            response = await client.get(url, headers={"Authorization": f"Bearer {page_access_token}"}, params=params or {})
        try:
            body = response.json()
        except Exception:
            body = {"raw": response.text}
        if response.status_code >= 400:
            raise MetaClientError(f"Meta API error {response.status_code} for {path}: {body}")
        return body

    async def _delete(self, page_context: PageContext, path: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
        page_access_token = await self._page_access_token(page_context)
        url = f"{self.base_url}/{path.lstrip('/')}"
        async with httpx.AsyncClient(timeout=20.0) as client:
            response = await client.delete(url, headers={"Authorization": f"Bearer {page_access_token}"}, params=params or {})
        try:
            body = response.json()
        except Exception:
            body = {"raw": response.text}
        if response.status_code >= 400:
            raise MetaClientError(f"Meta API error {response.status_code} for {path}: {body}")
        return body

    async def list_page_edge(self, page_context: PageContext, edge: str, fields: str, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]:
        creds = get_page_credentials(page_context)
        params: dict[str, Any] = {"fields": fields, "limit": limit}
        if after:
            params["after"] = after
        if before:
            params["before"] = before
        return await self._get(page_context, f"/{creds.page_id}/{edge}", params=params)

    async def list_page_posts(self, page_context: PageContext, edge: str = "posts", limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]:
        allowed_edges = {"posts", "feed", "published_posts", "scheduled_posts"}
        if edge not in allowed_edges:
            raise MetaClientError(f"Unsupported Page post edge: {edge}")
        fields = "id,message,created_time,updated_time,permalink_url,status_type,is_published,scheduled_publish_time,attachments{media_type,title,url,target}"
        return await self.list_page_edge(page_context, edge, fields, limit, after, before)

    async def list_page_media(self, page_context: PageContext, media_type: str = "all", limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]:
        edge_fields = {
            "photos": "id,name,created_time,updated_time,link,images,album",
            "videos": "id,title,description,created_time,updated_time,permalink_url,status",
            "video_reels": "id,title,description,created_time,updated_time,permalink_url,status",
        }
        requested = list(edge_fields) if media_type == "all" else [media_type]
        if any(edge not in edge_fields for edge in requested):
            raise MetaClientError(f"Unsupported media_type: {media_type}")
        results: dict[str, Any] = {}
        for edge in requested:
            results[edge] = await self.list_page_edge(page_context, edge, edge_fields[edge], limit, after, before)
        return {"media": results}

    async def list_conversations(self, page_context: PageContext, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]:
        fields = "id,updated_time,link,message_count,unread_count,participants"
        return await self.list_page_edge(page_context, "conversations", fields, limit, after, before)

    async def get_page_insights(self, page_context: PageContext, metrics: list[str], period: str = "day", since: str | None = None, until: str | None = None) -> dict[str, Any]:
        creds = get_page_credentials(page_context)
        params: dict[str, Any] = {"metric": ",".join(metrics), "period": period}
        if since:
            params["since"] = since
        if until:
            params["until"] = until
        return await self._get(page_context, f"/{creds.page_id}/insights", params=params)

    async def list_webhook_subscriptions(self, page_context: PageContext, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]:
        fields = "id,name,subscribed_fields,category"
        return await self.list_page_edge(page_context, "subscribed_apps", fields, limit, after, before)

    async def list_page_settings(self, page_context: PageContext, limit: int = 25, after: str | None = None, before: str | None = None) -> dict[str, Any]:
        fields = "setting,description,value"
        return await self.list_page_edge(page_context, "settings", fields, limit, after, before)

    async def publish_page_post(self, page_context: PageContext, message: str, publish_mode: PublishMode = PublishMode.now, scheduled_publish_time: datetime | None = None) -> dict[str, Any]:
        creds = get_page_credentials(page_context)
        payload = self.build_page_post_payload(message, publish_mode, scheduled_publish_time)
        if publish_mode == PublishMode.validation_only:
            return {"publish_attempted": False, "graph_path": f"/{creds.page_id}/feed", "payload": payload}
        return await self._post(page_context, f"/{creds.page_id}/feed", data_payload=payload)

    async def publish_page_photo_post(self, page_context: PageContext, message: str, media: dict[str, Any]) -> dict[str, Any]:
        creds = get_page_credentials(page_context)
        path = Path(media["media_path"])
        with path.open("rb") as fh:
            files = {"source": (path.name, fh, media.get("mime_type") or "application/octet-stream")}
            data = {"caption": message, "published": "true"}
            if media.get("alt_text"):
                data["alt_text_custom"] = str(media["alt_text"])
            return await self._post(page_context, f"/{creds.page_id}/photos", data_payload=data, files=files)

    async def publish_page_post_with_media(self, page_context: PageContext, message: str, media_attachments: list[dict[str, Any]], publish_mode: PublishMode = PublishMode.now, scheduled_publish_time: datetime | None = None) -> dict[str, Any]:
        if not media_attachments:
            return await self.publish_page_post(page_context, message, publish_mode, scheduled_publish_time)
        if publish_mode == PublishMode.validation_only:
            return {"publish_attempted": False, "media_count": len(media_attachments), "media_mode": "page_photo", "note": "single-image Page photo publish would be used for now-mode media posts"}
        if publish_mode == PublishMode.scheduled:
            raise MetaClientError("Scheduled Page posts with media are not enabled yet; use validation_only or now")
        if len(media_attachments) == 1 and media_attachments[0].get("media_type") == "image":
            return await self.publish_page_photo_post(page_context, message, media_attachments[0])
        raise MetaClientError("Only single-image Page posts are currently supported for direct media publishing")

    async def publish_reel(self, page_context: PageContext, video: dict[str, Any], description: str, title: str | None = None) -> dict[str, Any]:
        creds = get_page_credentials(page_context)
        start = await self._post(page_context, f"/{creds.page_id}/video_reels", data_payload={"upload_phase": "start"})
        video_id = start.get("video_id")
        upload_url = start.get("upload_url")
        if not video_id or not upload_url:
            raise MetaClientError(f"Meta Reels start response missing upload fields: {start}")
        page_access_token = await self._page_access_token(page_context)
        path = Path(video["media_path"])
        async with httpx.AsyncClient(timeout=300.0) as client:
            with path.open("rb") as fh:
                upload_response = await client.post(
                    upload_url,
                    headers={
                        "Authorization": f"OAuth {page_access_token}",
                        "file_size": str(path.stat().st_size),
                    },
                    content=fh,
                )
        try:
            upload_body = upload_response.json()
        except Exception:
            upload_body = {"raw": upload_response.text}
        if upload_response.status_code >= 400:
            raise MetaClientError(f"Meta Reels upload error {upload_response.status_code}: {upload_body}")
        data = {"upload_phase": "finish", "video_id": video_id, "description": description}
        if title:
            data["title"] = title
        finish = await self._post(page_context, f"/{creds.page_id}/video_reels", data_payload=data)
        return {"video_id": video_id, "start": start, "upload": upload_body, "finish": finish}

    async def reply_to_comment(self, page_context: PageContext, comment_id: str, message: str) -> dict[str, Any]:
        return await self._post(page_context, f"/{comment_id}/comments", data_payload={"message": message})

    async def read_comment_thread(self, page_context: PageContext, object_id: str, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]:
        params: dict[str, Any] = {"fields": "id,message,created_time,from,can_comment,can_hide,can_like,comment_count,is_hidden,parent{id}", "limit": limit}
        if after:
            params["after"] = after
        if before:
            params["before"] = before
        return await self._get(page_context, f"/{object_id}/comments", params=params)

    async def moderate_comment(self, page_context: PageContext, comment_id: str, action: str) -> dict[str, Any]:
        if action == "hide":
            return await self._post(page_context, f"/{comment_id}", data_payload={"is_hidden": "true"})
        if action == "unhide":
            return await self._post(page_context, f"/{comment_id}", data_payload={"is_hidden": "false"})
        if action == "delete":
            return await self._delete(page_context, f"/{comment_id}")
        raise MetaClientError(f"Unsupported comment moderation action: {action}")

    async def send_messenger_message(self, page_context: PageContext, recipient_psid: str, message: str, messaging_type: str = "RESPONSE") -> dict[str, Any]:
        creds = get_page_credentials(page_context)
        payload = {"recipient": {"id": recipient_psid}, "messaging_type": messaging_type, "message": {"text": message}}
        return await self._post(page_context, f"/{creds.page_id}/messages", json_payload=payload)

    async def read_conversation_detail(self, page_context: PageContext, conversation_id: str, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]:
        message_fields = "id,message,created_time,from,to"
        params: dict[str, Any] = {"fields": f"id,updated_time,link,message_count,unread_count,participants,messages.limit({limit}){{{message_fields}}}"}
        if after:
            params["after"] = after
        if before:
            params["before"] = before
        return await self._get(page_context, f"/{conversation_id}", params=params)

    async def messenger_response_window(self, page_context: PageContext, conversation_id: str) -> dict[str, Any]:
        detail = await self.read_conversation_detail(page_context, conversation_id, limit=5)
        messages = detail.get("messages", {}).get("data", []) if isinstance(detail, dict) else []
        newest = messages[0] if messages else None
        return {
            "conversation_id": conversation_id,
            "latest_message_time": newest.get("created_time") if isinstance(newest, dict) else None,
            "message_count": detail.get("message_count") if isinstance(detail, dict) else None,
            "response_window_status": "inspect_latest_message_time",
            "note": "Policy-window eligibility depends on the latest inbound user message and allowed messaging_type/tag; this endpoint is read-only.",
        }