openclaw-meta-bridge / app /services /meta_client.py
Ordo
Initial public release
0e84a1f
from pathlib import Path
from typing import Any
from datetime import datetime, timedelta, timezone
import httpx
from ..config import get_settings
from ..models import PageContext, PublishMode
from .token_resolver import get_page_credentials
class MetaClientError(RuntimeError):
pass
class MetaClient:
def __init__(self) -> None:
self.settings = get_settings()
self.base_url = f"https://graph.facebook.com/{self.settings.meta_graph_version}"
self._page_access_token_cache: dict[PageContext, str] = {}
async def _page_access_token(self, page_context: PageContext) -> str:
creds = get_page_credentials(page_context)
cached = self._page_access_token_cache.get(page_context)
if cached:
return cached
url = f"{self.base_url}/{creds.page_id}"
async with httpx.AsyncClient(timeout=20.0) as client:
response = await client.get(
url,
headers={"Authorization": f"Bearer {creds.page_token}"},
params={"fields": "access_token"},
)
try:
body = response.json()
except Exception:
body = {}
token = body.get("access_token") if response.status_code < 400 else None
if token:
self._page_access_token_cache[page_context] = token
return token
# Backward-compatible fallback for environments that still provide Page tokens directly.
return creds.page_token
async def _post(self, page_context: PageContext, path: str, json_payload: dict[str, Any] | None = None, data_payload: dict[str, Any] | None = None, files: dict[str, Any] | None = None) -> dict[str, Any]:
page_access_token = await self._page_access_token(page_context)
url = f"{self.base_url}/{path.lstrip('/')}"
async with httpx.AsyncClient(timeout=20.0) as client:
response = await client.post(url, headers={"Authorization": f"Bearer {page_access_token}"}, json=json_payload, data=data_payload, files=files)
try:
body = response.json()
except Exception:
body = {"raw": response.text}
if response.status_code >= 400:
raise MetaClientError(f"Meta API error {response.status_code} for {path}: {body}")
return body
def _scheduled_timestamp(self, scheduled_publish_time: datetime | None) -> int:
if scheduled_publish_time is None:
raise MetaClientError("scheduled_publish_time is required for scheduled Page posts")
scheduled = scheduled_publish_time
if scheduled.tzinfo is None:
scheduled = scheduled.replace(tzinfo=timezone.utc)
scheduled = scheduled.astimezone(timezone.utc)
now = datetime.now(timezone.utc)
if scheduled < now + timedelta(minutes=10):
raise MetaClientError("scheduled_publish_time must be at least 10 minutes in the future")
if scheduled > now + timedelta(days=75):
raise MetaClientError("scheduled_publish_time must be within 75 days")
return int(scheduled.timestamp())
def build_page_post_payload(self, message: str, publish_mode: PublishMode = PublishMode.now, scheduled_publish_time: datetime | None = None) -> dict[str, Any]:
payload: dict[str, Any] = {"message": message}
if publish_mode == PublishMode.scheduled:
payload["published"] = "false"
payload["scheduled_publish_time"] = str(self._scheduled_timestamp(scheduled_publish_time))
elif publish_mode == PublishMode.validation_only:
if scheduled_publish_time:
payload["published"] = "false"
payload["scheduled_publish_time"] = str(self._scheduled_timestamp(scheduled_publish_time))
else:
payload["published"] = "true"
return payload
async def _get(self, page_context: PageContext, path: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
page_access_token = await self._page_access_token(page_context)
url = f"{self.base_url}/{path.lstrip('/')}"
async with httpx.AsyncClient(timeout=20.0) as client:
response = await client.get(url, headers={"Authorization": f"Bearer {page_access_token}"}, params=params or {})
try:
body = response.json()
except Exception:
body = {"raw": response.text}
if response.status_code >= 400:
raise MetaClientError(f"Meta API error {response.status_code} for {path}: {body}")
return body
async def _delete(self, page_context: PageContext, path: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
page_access_token = await self._page_access_token(page_context)
url = f"{self.base_url}/{path.lstrip('/')}"
async with httpx.AsyncClient(timeout=20.0) as client:
response = await client.delete(url, headers={"Authorization": f"Bearer {page_access_token}"}, params=params or {})
try:
body = response.json()
except Exception:
body = {"raw": response.text}
if response.status_code >= 400:
raise MetaClientError(f"Meta API error {response.status_code} for {path}: {body}")
return body
async def list_page_edge(self, page_context: PageContext, edge: str, fields: str, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]:
creds = get_page_credentials(page_context)
params: dict[str, Any] = {"fields": fields, "limit": limit}
if after:
params["after"] = after
if before:
params["before"] = before
return await self._get(page_context, f"/{creds.page_id}/{edge}", params=params)
async def list_page_posts(self, page_context: PageContext, edge: str = "posts", limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]:
allowed_edges = {"posts", "feed", "published_posts", "scheduled_posts"}
if edge not in allowed_edges:
raise MetaClientError(f"Unsupported Page post edge: {edge}")
fields = "id,message,created_time,updated_time,permalink_url,status_type,is_published,scheduled_publish_time,attachments{media_type,title,url,target}"
return await self.list_page_edge(page_context, edge, fields, limit, after, before)
async def list_page_media(self, page_context: PageContext, media_type: str = "all", limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]:
edge_fields = {
"photos": "id,name,created_time,updated_time,link,images,album",
"videos": "id,title,description,created_time,updated_time,permalink_url,status",
"video_reels": "id,title,description,created_time,updated_time,permalink_url,status",
}
requested = list(edge_fields) if media_type == "all" else [media_type]
if any(edge not in edge_fields for edge in requested):
raise MetaClientError(f"Unsupported media_type: {media_type}")
results: dict[str, Any] = {}
for edge in requested:
results[edge] = await self.list_page_edge(page_context, edge, edge_fields[edge], limit, after, before)
return {"media": results}
async def list_conversations(self, page_context: PageContext, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]:
fields = "id,updated_time,link,message_count,unread_count,participants"
return await self.list_page_edge(page_context, "conversations", fields, limit, after, before)
async def get_page_insights(self, page_context: PageContext, metrics: list[str], period: str = "day", since: str | None = None, until: str | None = None) -> dict[str, Any]:
creds = get_page_credentials(page_context)
params: dict[str, Any] = {"metric": ",".join(metrics), "period": period}
if since:
params["since"] = since
if until:
params["until"] = until
return await self._get(page_context, f"/{creds.page_id}/insights", params=params)
async def list_webhook_subscriptions(self, page_context: PageContext, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]:
fields = "id,name,subscribed_fields,category"
return await self.list_page_edge(page_context, "subscribed_apps", fields, limit, after, before)
async def list_page_settings(self, page_context: PageContext, limit: int = 25, after: str | None = None, before: str | None = None) -> dict[str, Any]:
fields = "setting,description,value"
return await self.list_page_edge(page_context, "settings", fields, limit, after, before)
async def publish_page_post(self, page_context: PageContext, message: str, publish_mode: PublishMode = PublishMode.now, scheduled_publish_time: datetime | None = None) -> dict[str, Any]:
creds = get_page_credentials(page_context)
payload = self.build_page_post_payload(message, publish_mode, scheduled_publish_time)
if publish_mode == PublishMode.validation_only:
return {"publish_attempted": False, "graph_path": f"/{creds.page_id}/feed", "payload": payload}
return await self._post(page_context, f"/{creds.page_id}/feed", data_payload=payload)
async def publish_page_photo_post(self, page_context: PageContext, message: str, media: dict[str, Any]) -> dict[str, Any]:
creds = get_page_credentials(page_context)
path = Path(media["media_path"])
with path.open("rb") as fh:
files = {"source": (path.name, fh, media.get("mime_type") or "application/octet-stream")}
data = {"caption": message, "published": "true"}
if media.get("alt_text"):
data["alt_text_custom"] = str(media["alt_text"])
return await self._post(page_context, f"/{creds.page_id}/photos", data_payload=data, files=files)
async def publish_page_post_with_media(self, page_context: PageContext, message: str, media_attachments: list[dict[str, Any]], publish_mode: PublishMode = PublishMode.now, scheduled_publish_time: datetime | None = None) -> dict[str, Any]:
if not media_attachments:
return await self.publish_page_post(page_context, message, publish_mode, scheduled_publish_time)
if publish_mode == PublishMode.validation_only:
return {"publish_attempted": False, "media_count": len(media_attachments), "media_mode": "page_photo", "note": "single-image Page photo publish would be used for now-mode media posts"}
if publish_mode == PublishMode.scheduled:
raise MetaClientError("Scheduled Page posts with media are not enabled yet; use validation_only or now")
if len(media_attachments) == 1 and media_attachments[0].get("media_type") == "image":
return await self.publish_page_photo_post(page_context, message, media_attachments[0])
raise MetaClientError("Only single-image Page posts are currently supported for direct media publishing")
async def publish_reel(self, page_context: PageContext, video: dict[str, Any], description: str, title: str | None = None) -> dict[str, Any]:
creds = get_page_credentials(page_context)
start = await self._post(page_context, f"/{creds.page_id}/video_reels", data_payload={"upload_phase": "start"})
video_id = start.get("video_id")
upload_url = start.get("upload_url")
if not video_id or not upload_url:
raise MetaClientError(f"Meta Reels start response missing upload fields: {start}")
page_access_token = await self._page_access_token(page_context)
path = Path(video["media_path"])
async with httpx.AsyncClient(timeout=300.0) as client:
with path.open("rb") as fh:
upload_response = await client.post(
upload_url,
headers={
"Authorization": f"OAuth {page_access_token}",
"file_size": str(path.stat().st_size),
},
content=fh,
)
try:
upload_body = upload_response.json()
except Exception:
upload_body = {"raw": upload_response.text}
if upload_response.status_code >= 400:
raise MetaClientError(f"Meta Reels upload error {upload_response.status_code}: {upload_body}")
data = {"upload_phase": "finish", "video_id": video_id, "description": description}
if title:
data["title"] = title
finish = await self._post(page_context, f"/{creds.page_id}/video_reels", data_payload=data)
return {"video_id": video_id, "start": start, "upload": upload_body, "finish": finish}
async def reply_to_comment(self, page_context: PageContext, comment_id: str, message: str) -> dict[str, Any]:
return await self._post(page_context, f"/{comment_id}/comments", data_payload={"message": message})
async def read_comment_thread(self, page_context: PageContext, object_id: str, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]:
params: dict[str, Any] = {"fields": "id,message,created_time,from,can_comment,can_hide,can_like,comment_count,is_hidden,parent{id}", "limit": limit}
if after:
params["after"] = after
if before:
params["before"] = before
return await self._get(page_context, f"/{object_id}/comments", params=params)
async def moderate_comment(self, page_context: PageContext, comment_id: str, action: str) -> dict[str, Any]:
if action == "hide":
return await self._post(page_context, f"/{comment_id}", data_payload={"is_hidden": "true"})
if action == "unhide":
return await self._post(page_context, f"/{comment_id}", data_payload={"is_hidden": "false"})
if action == "delete":
return await self._delete(page_context, f"/{comment_id}")
raise MetaClientError(f"Unsupported comment moderation action: {action}")
async def send_messenger_message(self, page_context: PageContext, recipient_psid: str, message: str, messaging_type: str = "RESPONSE") -> dict[str, Any]:
creds = get_page_credentials(page_context)
payload = {"recipient": {"id": recipient_psid}, "messaging_type": messaging_type, "message": {"text": message}}
return await self._post(page_context, f"/{creds.page_id}/messages", json_payload=payload)
async def read_conversation_detail(self, page_context: PageContext, conversation_id: str, limit: int = 10, after: str | None = None, before: str | None = None) -> dict[str, Any]:
message_fields = "id,message,created_time,from,to"
params: dict[str, Any] = {"fields": f"id,updated_time,link,message_count,unread_count,participants,messages.limit({limit}){{{message_fields}}}"}
if after:
params["after"] = after
if before:
params["before"] = before
return await self._get(page_context, f"/{conversation_id}", params=params)
async def messenger_response_window(self, page_context: PageContext, conversation_id: str) -> dict[str, Any]:
detail = await self.read_conversation_detail(page_context, conversation_id, limit=5)
messages = detail.get("messages", {}).get("data", []) if isinstance(detail, dict) else []
newest = messages[0] if messages else None
return {
"conversation_id": conversation_id,
"latest_message_time": newest.get("created_time") if isinstance(newest, dict) else None,
"message_count": detail.get("message_count") if isinstance(detail, dict) else None,
"response_window_status": "inspect_latest_message_time",
"note": "Policy-window eligibility depends on the latest inbound user message and allowed messaging_type/tag; this endpoint is read-only.",
}