from fastapi import APIRouter, Depends, Header, HTTPException, Query from sqlmodel import Session from ..config import get_settings from ..db import get_session from ..models import Draft, DraftStatus, RiskLevel, AuditLog, Channel, PageContext from ..schemas import CreateDraftRequest, PublishPagePostRequest, PublishReelRequest, SendMessengerRequest, ReplyToCommentRequest, ModerateCommentRequest from ..services.meta_client import MetaClient, MetaClientError from ..services.media_validation import MediaValidationError, validate_page_post_media, validate_reel_asset from ..services.risk_rules import classify_risk, max_risk router = APIRouter(prefix="/tools/meta", tags=["meta-tools"]) VERIFIED_INSIGHT_METRICS = { "page_impressions_unique", "page_post_engagements", "page_views_total", "page_actions_post_reactions_total", "page_video_views", "page_total_actions", } def bounded_limit(limit: int, maximum: int = 100) -> int: return max(1, min(limit, maximum)) def graph_error(exc: MetaClientError) -> HTTPException: return HTTPException(status_code=502, detail=str(exc)) def require_internal_auth(x_meta_bridge_key: str | None = Header(default=None)): if x_meta_bridge_key != get_settings().meta_bridge_internal_api_key: raise HTTPException(status_code=401, detail="Unauthorized") @router.get("/page/posts", dependencies=[Depends(require_internal_auth)]) async def list_page_posts( page_context: str, edge: str = Query(default="posts", pattern="^(posts|feed|published_posts)$"), limit: int = Query(default=10, ge=1, le=100), after: str | None = None, before: str | None = None, ): try: return await MetaClient().list_page_posts(PageContext(page_context), edge=edge, limit=bounded_limit(limit), after=after, before=before) except ValueError: raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}") except MetaClientError as exc: raise graph_error(exc) @router.get("/page/scheduled_posts", dependencies=[Depends(require_internal_auth)]) async def list_scheduled_page_posts( page_context: str, limit: int = Query(default=10, ge=1, le=100), after: str | None = None, before: str | None = None, ): try: return await MetaClient().list_page_posts(PageContext(page_context), edge="scheduled_posts", limit=bounded_limit(limit), after=after, before=before) except ValueError: raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}") except MetaClientError as exc: raise graph_error(exc) @router.get("/page/media", dependencies=[Depends(require_internal_auth)]) async def list_page_media( page_context: str, media_type: str = Query(default="all", pattern="^(all|photos|videos|video_reels)$"), limit: int = Query(default=10, ge=1, le=100), after: str | None = None, before: str | None = None, ): try: return await MetaClient().list_page_media(PageContext(page_context), media_type=media_type, limit=bounded_limit(limit), after=after, before=before) except ValueError: raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}") except MetaClientError as exc: raise graph_error(exc) @router.get("/conversations", dependencies=[Depends(require_internal_auth)]) async def list_conversations( page_context: str, limit: int = Query(default=10, ge=1, le=100), after: str | None = None, before: str | None = None, ): try: return await MetaClient().list_conversations(PageContext(page_context), limit=bounded_limit(limit), after=after, before=before) except ValueError: raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}") except MetaClientError as exc: raise graph_error(exc) @router.get("/insights", dependencies=[Depends(require_internal_auth)]) async def get_page_insights( page_context: str, metrics: str | None = None, period: str = Query(default="day", pattern="^(day|week|days_28|month|lifetime)$"), since: str | None = None, until: str | None = None, ): requested = [m.strip() for m in metrics.split(",")] if metrics else sorted(VERIFIED_INSIGHT_METRICS) invalid = [m for m in requested if m not in VERIFIED_INSIGHT_METRICS] if invalid: raise HTTPException(status_code=400, detail={"invalid_metrics": invalid, "allowed_metrics": sorted(VERIFIED_INSIGHT_METRICS)}) try: return await MetaClient().get_page_insights(PageContext(page_context), metrics=requested, period=period, since=since, until=until) except ValueError: raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}") except MetaClientError as exc: raise graph_error(exc) @router.get("/webhooks/subscriptions", dependencies=[Depends(require_internal_auth)]) async def list_webhook_subscriptions( page_context: str, limit: int = Query(default=10, ge=1, le=100), after: str | None = None, before: str | None = None, ): try: return await MetaClient().list_webhook_subscriptions(PageContext(page_context), limit=bounded_limit(limit), after=after, before=before) except ValueError: raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}") except MetaClientError as exc: raise graph_error(exc) @router.get("/settings", dependencies=[Depends(require_internal_auth)]) async def list_page_settings( page_context: str, limit: int = Query(default=25, ge=1, le=100), after: str | None = None, before: str | None = None, ): try: return await MetaClient().list_page_settings(PageContext(page_context), limit=bounded_limit(limit), after=after, before=before) except ValueError: raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}") except MetaClientError as exc: raise graph_error(exc) @router.post("/draft", dependencies=[Depends(require_internal_auth)]) async def create_draft(req: CreateDraftRequest, session: Session = Depends(get_session)): is_public = req.channel in {Channel.comment, Channel.page_post} risk = max_risk(req.risk_level, classify_risk(req.page_context, req.channel, req.draft_text, is_public)) try: media_attachments = validate_page_post_media([m.model_dump(exclude_none=True) for m in req.media_attachments], req.media_required) if req.channel == Channel.page_post else [] except MediaValidationError as exc: raise HTTPException(status_code=400, detail=str(exc)) draft = Draft(page_context=req.page_context, source_event_id=req.source_event_id, channel=req.channel, target_object_id=req.target_object_id, recipient_psid=req.recipient_psid, draft_text=req.draft_text, sources=req.sources, media_attachments=media_attachments, media_required=req.media_required, publish_mode=req.publish_mode, scheduled_publish_time=req.scheduled_publish_time, risk_level=risk, status=DraftStatus.rejected if risk == RiskLevel.blocked else DraftStatus.needs_review, created_by=req.created_by) session.add(draft); session.commit(); session.refresh(draft) session.add(AuditLog(page_context=req.page_context, actor=req.created_by, action="draft.created", draft_id=draft.id, event_id=req.source_event_id, details={"risk_level": risk.value, "channel": req.channel.value, "media_required": req.media_required, "media_count": len(media_attachments), "publish_mode": req.publish_mode.value, "scheduled_publish_time": req.scheduled_publish_time.isoformat() if req.scheduled_publish_time else None})) session.commit() return {"draft_id": draft.id, "status": draft.status, "risk_level": draft.risk_level} @router.post("/page_post/publish", dependencies=[Depends(require_internal_auth)]) async def publish_page_post(req: PublishPagePostRequest, session: Session = Depends(get_session)): if not req.approved_by: raise HTTPException(status_code=403, detail="Human approval required") try: media_attachments = validate_page_post_media([m.model_dump(exclude_none=True) for m in req.media_attachments], req.media_required) except MediaValidationError as exc: raise HTTPException(status_code=400, detail=str(exc)) try: meta_response = await MetaClient().publish_page_post_with_media(req.page_context, req.message, media_attachments, req.publish_mode, req.scheduled_publish_time) except MetaClientError as exc: raise HTTPException(status_code=502, detail=str(exc)) action = "page_post.validated" if req.publish_mode.value == "validation_only" else "page_post.scheduled" if req.publish_mode.value == "scheduled" else "page_post.published" session.add(AuditLog(page_context=req.page_context, actor=req.approved_by, action=action, details={"meta_response": meta_response, "sources": req.sources, "media_required": req.media_required, "media_count": len(media_attachments), "publish_mode": req.publish_mode.value, "scheduled_publish_time": req.scheduled_publish_time.isoformat() if req.scheduled_publish_time else None})) session.commit() status = "validated" if req.publish_mode.value == "validation_only" else "scheduled" if req.publish_mode.value == "scheduled" else "published" return {"status": status, "meta_response": meta_response} @router.post("/reels/publish", dependencies=[Depends(require_internal_auth)]) async def publish_reel(req: PublishReelRequest, session: Session = Depends(get_session)): if not req.approved_by: raise HTTPException(status_code=403, detail="Human approval required") try: video = validate_reel_asset(req.video_path) except MediaValidationError as exc: raise HTTPException(status_code=400, detail=str(exc)) if req.validation_only: session.add(AuditLog(page_context=req.page_context, actor=req.approved_by, action="reel.validated", details={"video": video, "sources": req.sources})) session.commit() return {"status": "validated", "video": video, "publish_attempted": False} try: meta_response = await MetaClient().publish_reel(req.page_context, video, req.description, req.title) except MetaClientError as exc: raise HTTPException(status_code=502, detail=str(exc)) session.add(AuditLog(page_context=req.page_context, actor=req.approved_by, action="reel.published", details={"meta_response": meta_response, "video": video, "sources": req.sources})) session.commit() return {"status": "published", "meta_response": meta_response} @router.post("/messenger/send", dependencies=[Depends(require_internal_auth)]) async def send_messenger(req: SendMessengerRequest, session: Session = Depends(get_session)): if not req.approved_by: raise HTTPException(status_code=403, detail="Human approval required") try: meta_response = await MetaClient().send_messenger_message(req.page_context, req.recipient_psid, req.message, req.messaging_type) except MetaClientError as exc: raise HTTPException(status_code=502, detail=str(exc)) session.add(AuditLog(page_context=req.page_context, actor=req.approved_by, action="messenger.sent", details={"meta_response": meta_response})) session.commit() return {"status": "sent", "meta_response": meta_response} @router.get("/messenger/conversation", dependencies=[Depends(require_internal_auth)]) async def read_messenger_conversation( page_context: str, conversation_id: str, limit: int = Query(default=10, ge=1, le=50), after: str | None = None, before: str | None = None, ): try: return await MetaClient().read_conversation_detail(PageContext(page_context), conversation_id, limit=bounded_limit(limit, 50), after=after, before=before) except ValueError: raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}") except MetaClientError as exc: raise graph_error(exc) @router.get("/messenger/response_window", dependencies=[Depends(require_internal_auth)]) async def messenger_response_window(page_context: str, conversation_id: str): try: return await MetaClient().messenger_response_window(PageContext(page_context), conversation_id) except ValueError: raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}") except MetaClientError as exc: raise graph_error(exc) @router.post("/comment/reply", dependencies=[Depends(require_internal_auth)]) async def reply_to_comment(req: ReplyToCommentRequest, session: Session = Depends(get_session)): if not req.approved_by: raise HTTPException(status_code=403, detail="Human approval required") try: meta_response = await MetaClient().reply_to_comment(req.page_context, req.comment_id, req.message) except MetaClientError as exc: raise HTTPException(status_code=502, detail=str(exc)) session.add(AuditLog(page_context=req.page_context, actor=req.approved_by, action="comment.replied", details={"comment_id": req.comment_id, "meta_response": meta_response})) session.commit() return {"status": "replied", "meta_response": meta_response} @router.get("/comment/thread", dependencies=[Depends(require_internal_auth)]) async def read_comment_thread( page_context: str, object_id: str, limit: int = Query(default=10, ge=1, le=50), after: str | None = None, before: str | None = None, ): try: return await MetaClient().read_comment_thread(PageContext(page_context), object_id, limit=bounded_limit(limit, 50), after=after, before=before) except ValueError: raise HTTPException(status_code=400, detail=f"Unsupported page_context: {page_context}") except MetaClientError as exc: raise graph_error(exc) @router.post("/comment/moderate", dependencies=[Depends(require_internal_auth)]) async def moderate_comment(req: ModerateCommentRequest, session: Session = Depends(get_session)): if not req.approved_by or not req.reason: raise HTTPException(status_code=403, detail="Human approval and reason required") try: meta_response = await MetaClient().moderate_comment(req.page_context, req.comment_id, req.action) except MetaClientError as exc: raise HTTPException(status_code=502, detail=str(exc)) session.add(AuditLog(page_context=req.page_context, actor=req.approved_by, action=f"comment.{req.action}", details={"comment_id": req.comment_id, "reason": req.reason, "meta_response": meta_response})) session.commit() return {"status": req.action, "meta_response": meta_response}