Ordo
Initial public release
0e84a1f
from fastapi import APIRouter, Depends, Header, HTTPException, Query
from sqlmodel import Session
from ..config import get_settings
from ..db import get_session
from ..models import Draft, DraftStatus, RiskLevel, AuditLog, Channel, PageContext
from ..schemas import CreateDraftRequest, PublishPagePostRequest, PublishReelRequest, SendMessengerRequest, ReplyToCommentRequest, ModerateCommentRequest
from ..services.meta_client import MetaClient, MetaClientError
from ..services.media_validation import MediaValidationError, validate_page_post_media, validate_reel_asset
from ..services.risk_rules import classify_risk, max_risk
router = APIRouter(prefix="/tools/meta", tags=["meta-tools"])
VERIFIED_INSIGHT_METRICS = {
"page_impressions_unique",
"page_post_engagements",
"page_views_total",
"page_actions_post_reactions_total",
"page_video_views",
"page_total_actions",
}
def bounded_limit(limit: int, maximum: int = 100) -> int:
return max(1, min(limit, maximum))
def graph_error(exc: MetaClientError) -> HTTPException:
return HTTPException(status_code=502, detail=str(exc))
def require_internal_auth(x_meta_bridge_key: str | None = Header(default=None)):
if x_meta_bridge_key != get_settings().meta_bridge_internal_api_key:
raise HTTPException(status_code=401, detail="Unauthorized")
@router.get("/page/posts", dependencies=[Depends(require_internal_auth)])
async def list_page_posts(
page_context: str,
edge: str = Query(default="posts", pattern="^(posts|feed|published_posts)$"),
limit: int = Query(default=10, ge=1, le=100),
after: str | None = None,
before: str | None = None,
):
try:
return await MetaClient().list_page_posts(PageContext(page_context), edge=edge, limit=bounded_limit(limit), after=after, before=before)
except ValueError:
raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}")
except MetaClientError as exc:
raise graph_error(exc)
@router.get("/page/scheduled_posts", dependencies=[Depends(require_internal_auth)])
async def list_scheduled_page_posts(
page_context: str,
limit: int = Query(default=10, ge=1, le=100),
after: str | None = None,
before: str | None = None,
):
try:
return await MetaClient().list_page_posts(PageContext(page_context), edge="scheduled_posts", limit=bounded_limit(limit), after=after, before=before)
except ValueError:
raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}")
except MetaClientError as exc:
raise graph_error(exc)
@router.get("/page/media", dependencies=[Depends(require_internal_auth)])
async def list_page_media(
page_context: str,
media_type: str = Query(default="all", pattern="^(all|photos|videos|video_reels)$"),
limit: int = Query(default=10, ge=1, le=100),
after: str | None = None,
before: str | None = None,
):
try:
return await MetaClient().list_page_media(PageContext(page_context), media_type=media_type, limit=bounded_limit(limit), after=after, before=before)
except ValueError:
raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}")
except MetaClientError as exc:
raise graph_error(exc)
@router.get("/conversations", dependencies=[Depends(require_internal_auth)])
async def list_conversations(
page_context: str,
limit: int = Query(default=10, ge=1, le=100),
after: str | None = None,
before: str | None = None,
):
try:
return await MetaClient().list_conversations(PageContext(page_context), limit=bounded_limit(limit), after=after, before=before)
except ValueError:
raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}")
except MetaClientError as exc:
raise graph_error(exc)
@router.get("/insights", dependencies=[Depends(require_internal_auth)])
async def get_page_insights(
page_context: str,
metrics: str | None = None,
period: str = Query(default="day", pattern="^(day|week|days_28|month|lifetime)$"),
since: str | None = None,
until: str | None = None,
):
requested = [m.strip() for m in metrics.split(",")] if metrics else sorted(VERIFIED_INSIGHT_METRICS)
invalid = [m for m in requested if m not in VERIFIED_INSIGHT_METRICS]
if invalid:
raise HTTPException(status_code=400, detail={"invalid_metrics": invalid, "allowed_metrics": sorted(VERIFIED_INSIGHT_METRICS)})
try:
return await MetaClient().get_page_insights(PageContext(page_context), metrics=requested, period=period, since=since, until=until)
except ValueError:
raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}")
except MetaClientError as exc:
raise graph_error(exc)
@router.get("/webhooks/subscriptions", dependencies=[Depends(require_internal_auth)])
async def list_webhook_subscriptions(
page_context: str,
limit: int = Query(default=10, ge=1, le=100),
after: str | None = None,
before: str | None = None,
):
try:
return await MetaClient().list_webhook_subscriptions(PageContext(page_context), limit=bounded_limit(limit), after=after, before=before)
except ValueError:
raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}")
except MetaClientError as exc:
raise graph_error(exc)
@router.get("/settings", dependencies=[Depends(require_internal_auth)])
async def list_page_settings(
page_context: str,
limit: int = Query(default=25, ge=1, le=100),
after: str | None = None,
before: str | None = None,
):
try:
return await MetaClient().list_page_settings(PageContext(page_context), limit=bounded_limit(limit), after=after, before=before)
except ValueError:
raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}")
except MetaClientError as exc:
raise graph_error(exc)
@router.post("/draft", dependencies=[Depends(require_internal_auth)])
async def create_draft(req: CreateDraftRequest, session: Session = Depends(get_session)):
is_public = req.channel in {Channel.comment, Channel.page_post}
risk = max_risk(req.risk_level, classify_risk(req.page_context, req.channel, req.draft_text, is_public))
try:
media_attachments = validate_page_post_media([m.model_dump(exclude_none=True) for m in req.media_attachments], req.media_required) if req.channel == Channel.page_post else []
except MediaValidationError as exc:
raise HTTPException(status_code=400, detail=str(exc))
draft = Draft(page_context=req.page_context, source_event_id=req.source_event_id, channel=req.channel, target_object_id=req.target_object_id, recipient_psid=req.recipient_psid, draft_text=req.draft_text, sources=req.sources, media_attachments=media_attachments, media_required=req.media_required, publish_mode=req.publish_mode, scheduled_publish_time=req.scheduled_publish_time, risk_level=risk, status=DraftStatus.rejected if risk == RiskLevel.blocked else DraftStatus.needs_review, created_by=req.created_by)
session.add(draft); session.commit(); session.refresh(draft)
session.add(AuditLog(page_context=req.page_context, actor=req.created_by, action="draft.created", draft_id=draft.id, event_id=req.source_event_id, details={"risk_level": risk.value, "channel": req.channel.value, "media_required": req.media_required, "media_count": len(media_attachments), "publish_mode": req.publish_mode.value, "scheduled_publish_time": req.scheduled_publish_time.isoformat() if req.scheduled_publish_time else None}))
session.commit()
return {"draft_id": draft.id, "status": draft.status, "risk_level": draft.risk_level}
@router.post("/page_post/publish", dependencies=[Depends(require_internal_auth)])
async def publish_page_post(req: PublishPagePostRequest, session: Session = Depends(get_session)):
if not req.approved_by:
raise HTTPException(status_code=403, detail="Human approval required")
try:
media_attachments = validate_page_post_media([m.model_dump(exclude_none=True) for m in req.media_attachments], req.media_required)
except MediaValidationError as exc:
raise HTTPException(status_code=400, detail=str(exc))
try:
meta_response = await MetaClient().publish_page_post_with_media(req.page_context, req.message, media_attachments, req.publish_mode, req.scheduled_publish_time)
except MetaClientError as exc:
raise HTTPException(status_code=502, detail=str(exc))
action = "page_post.validated" if req.publish_mode.value == "validation_only" else "page_post.scheduled" if req.publish_mode.value == "scheduled" else "page_post.published"
session.add(AuditLog(page_context=req.page_context, actor=req.approved_by, action=action, details={"meta_response": meta_response, "sources": req.sources, "media_required": req.media_required, "media_count": len(media_attachments), "publish_mode": req.publish_mode.value, "scheduled_publish_time": req.scheduled_publish_time.isoformat() if req.scheduled_publish_time else None}))
session.commit()
status = "validated" if req.publish_mode.value == "validation_only" else "scheduled" if req.publish_mode.value == "scheduled" else "published"
return {"status": status, "meta_response": meta_response}
@router.post("/reels/publish", dependencies=[Depends(require_internal_auth)])
async def publish_reel(req: PublishReelRequest, session: Session = Depends(get_session)):
if not req.approved_by:
raise HTTPException(status_code=403, detail="Human approval required")
try:
video = validate_reel_asset(req.video_path)
except MediaValidationError as exc:
raise HTTPException(status_code=400, detail=str(exc))
if req.validation_only:
session.add(AuditLog(page_context=req.page_context, actor=req.approved_by, action="reel.validated", details={"video": video, "sources": req.sources}))
session.commit()
return {"status": "validated", "video": video, "publish_attempted": False}
try:
meta_response = await MetaClient().publish_reel(req.page_context, video, req.description, req.title)
except MetaClientError as exc:
raise HTTPException(status_code=502, detail=str(exc))
session.add(AuditLog(page_context=req.page_context, actor=req.approved_by, action="reel.published", details={"meta_response": meta_response, "video": video, "sources": req.sources}))
session.commit()
return {"status": "published", "meta_response": meta_response}
@router.post("/messenger/send", dependencies=[Depends(require_internal_auth)])
async def send_messenger(req: SendMessengerRequest, session: Session = Depends(get_session)):
if not req.approved_by:
raise HTTPException(status_code=403, detail="Human approval required")
try:
meta_response = await MetaClient().send_messenger_message(req.page_context, req.recipient_psid, req.message, req.messaging_type)
except MetaClientError as exc:
raise HTTPException(status_code=502, detail=str(exc))
session.add(AuditLog(page_context=req.page_context, actor=req.approved_by, action="messenger.sent", details={"meta_response": meta_response}))
session.commit()
return {"status": "sent", "meta_response": meta_response}
@router.get("/messenger/conversation", dependencies=[Depends(require_internal_auth)])
async def read_messenger_conversation(
page_context: str,
conversation_id: str,
limit: int = Query(default=10, ge=1, le=50),
after: str | None = None,
before: str | None = None,
):
try:
return await MetaClient().read_conversation_detail(PageContext(page_context), conversation_id, limit=bounded_limit(limit, 50), after=after, before=before)
except ValueError:
raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}")
except MetaClientError as exc:
raise graph_error(exc)
@router.get("/messenger/response_window", dependencies=[Depends(require_internal_auth)])
async def messenger_response_window(page_context: str, conversation_id: str):
try:
return await MetaClient().messenger_response_window(PageContext(page_context), conversation_id)
except ValueError:
raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}")
except MetaClientError as exc:
raise graph_error(exc)
@router.post("/comment/reply", dependencies=[Depends(require_internal_auth)])
async def reply_to_comment(req: ReplyToCommentRequest, session: Session = Depends(get_session)):
if not req.approved_by:
raise HTTPException(status_code=403, detail="Human approval required")
try:
meta_response = await MetaClient().reply_to_comment(req.page_context, req.comment_id, req.message)
except MetaClientError as exc:
raise HTTPException(status_code=502, detail=str(exc))
session.add(AuditLog(page_context=req.page_context, actor=req.approved_by, action="comment.replied", details={"comment_id": req.comment_id, "meta_response": meta_response}))
session.commit()
return {"status": "replied", "meta_response": meta_response}
@router.get("/comment/thread", dependencies=[Depends(require_internal_auth)])
async def read_comment_thread(
page_context: str,
object_id: str,
limit: int = Query(default=10, ge=1, le=50),
after: str | None = None,
before: str | None = None,
):
try:
return await MetaClient().read_comment_thread(PageContext(page_context), object_id, limit=bounded_limit(limit, 50), after=after, before=before)
except ValueError:
raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}")
except MetaClientError as exc:
raise graph_error(exc)
@router.post("/comment/moderate", dependencies=[Depends(require_internal_auth)])
async def moderate_comment(req: ModerateCommentRequest, session: Session = Depends(get_session)):
if not req.approved_by or not req.reason:
raise HTTPException(status_code=403, detail="Human approval and reason required")
try:
meta_response = await MetaClient().moderate_comment(req.page_context, req.comment_id, req.action)
except MetaClientError as exc:
raise HTTPException(status_code=502, detail=str(exc))
session.add(AuditLog(page_context=req.page_context, actor=req.approved_by, action=f"comment.{req.action}", details={"comment_id": req.comment_id, "reason": req.reason, "meta_response": meta_response}))
session.commit()
return {"status": req.action, "meta_response": meta_response}