Ashraf Al-Kassem Claude Opus 4.6 commited on
Commit
eed40fb
·
1 Parent(s): f187462

feat: Mission 31 — Meta Platform: Instagram DM, Messenger, and Lead Ads Full Lifecycle

Browse files

- OAuth 2.0 connect flow (start + callback with JWT state, token exchange chain)
- Webhook subscription API (auto-subscribe after OAuth, manual trigger endpoint)
- Message normalization: Instagram DM + Messenger channel discrimination, attachment parsing
- Meta delivery/read status webhooks via watermark-based updates
- Lead Ads ingestion: fetch lead data from Graph API, enrich contacts
- MetaAdapter: send_media implementation + get_user_profile
- Dispatch service: Meta media dispatch (mirrors WhatsApp pattern)
- Channel-level observability metrics (instagram/messenger discrimination)
- 21 new tests (288 total), all passing + 15/15 E2E checks

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

backend/alembic/versions/k1l2m3n4o5p6_mission_31_meta_platform.py ADDED
@@ -0,0 +1,21 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Mission 31 — Meta Platform: Instagram DM, Messenger, Lead Ads
2
+
3
+ Revision ID: k1l2m3n4o5p6
4
+ Revises: j0k1l2m3n4o5
5
+ Create Date: 2026-03-01
6
+ """
7
+ from alembic import op
8
+ import sqlalchemy as sa
9
+
10
+ revision = "k1l2m3n4o5p6"
11
+ down_revision = "j0k1l2m3n4o5"
12
+ branch_labels = None
13
+ depends_on = None
14
+
15
+
16
+ def upgrade() -> None:
17
+ op.add_column("channelidentity", sa.Column("channel", sa.String, nullable=True))
18
+
19
+
20
+ def downgrade() -> None:
21
+ op.drop_column("channelidentity", "channel")
backend/app/api/v1/integrations.py CHANGED
@@ -185,3 +185,42 @@ async def integrations_health_check(
185
  )
186
  await db.commit()
187
  return wrap_data({"status": "ok", "checked_count": len(integrations), "errors_found": errors_found})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
  )
186
  await db.commit()
187
  return wrap_data({"status": "ok", "checked_count": len(integrations), "errors_found": errors_found})
188
+
189
+ @router.post("/meta/webhooks/subscribe", response_model=ResponseEnvelope[dict], dependencies=[Depends(require_module_enabled(MODULE_INTEGRATIONS_CONNECT, "write")), Depends(require_entitlement("integrations_connect"))])
190
+ async def subscribe_meta_webhooks(
191
+ request: Request,
192
+ db: AsyncSession = Depends(get_db),
193
+ workspace: Workspace = Depends(deps.get_active_workspace),
194
+ current_user: User = Depends(deps.get_current_user),
195
+ ) -> Any:
196
+ """Manually trigger webhook subscription for a connected Meta integration."""
197
+ result = await db.execute(
198
+ select(Integration).where(
199
+ Integration.workspace_id == workspace.id,
200
+ Integration.provider == "meta",
201
+ Integration.status == IntegrationStatus.CONNECTED,
202
+ )
203
+ )
204
+ integration = result.scalars().first()
205
+ if not integration or not integration.encrypted_config:
206
+ return wrap_error("No connected Meta integration found")
207
+
208
+ config = decrypt_data(integration.encrypted_config)
209
+ page_id = integration.provider_workspace_id
210
+ page_access_token = config.get("access_token")
211
+
212
+ from app.services.meta_service import subscribe_page_webhooks
213
+ success = await subscribe_page_webhooks(page_id, page_access_token)
214
+
215
+ await audit_event(
216
+ db, action="meta_webhook_subscribe", entity_type="integration",
217
+ entity_id=str(integration.id), actor_user_id=current_user.id,
218
+ outcome="success" if success else "failure",
219
+ workspace_id=workspace.id, request=request,
220
+ metadata={"provider": "meta", "page_id": page_id},
221
+ )
222
+ await db.commit()
223
+
224
+ if success:
225
+ return wrap_data({"message": "Webhook subscription successful", "page_id": page_id})
226
+ return wrap_error("Failed to subscribe to webhooks")
backend/app/api/v1/meta_oauth.py ADDED
@@ -0,0 +1,260 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Meta OAuth 2.0 connect flow — start and callback endpoints."""
2
+ import logging
3
+ from datetime import datetime, timedelta
4
+ from urllib.parse import urlencode
5
+
6
+ import httpx
7
+ from fastapi import APIRouter, Depends, Request
8
+ from fastapi.responses import RedirectResponse
9
+ from jose import jwt, JWTError
10
+ from sqlalchemy.ext.asyncio import AsyncSession
11
+ from sqlalchemy.exc import IntegrityError
12
+ from sqlmodel import select
13
+
14
+ from app.api import deps
15
+ from app.core.config import settings
16
+ from app.core.db import get_db
17
+ from app.core.modules import require_module_enabled, MODULE_INTEGRATIONS_CONNECT
18
+ from app.core.security import encrypt_data
19
+ from app.models.models import Integration, IntegrationStatus, User, Workspace
20
+ from app.schemas.envelope import wrap_data, wrap_error
21
+ from app.services.audit_service import audit_event
22
+ from app.services.entitlements import require_entitlement
23
+ from app.services.meta_service import subscribe_page_webhooks
24
+ from app.services.metrics_service import metrics
25
+ from app.services.runtime_event_service import log_event
26
+
27
+ logger = logging.getLogger(__name__)
28
+ router = APIRouter()
29
+
30
+ GRAPH_API_VERSION = "v18.0"
31
+ GRAPH_BASE = f"https://graph.facebook.com/{GRAPH_API_VERSION}"
32
+ META_SCOPES = "pages_messaging,pages_manage_metadata,instagram_manage_messages,leads_retrieval,pages_read_engagement"
33
+
34
+
35
+ @router.get(
36
+ "/start",
37
+ dependencies=[
38
+ Depends(require_module_enabled(MODULE_INTEGRATIONS_CONNECT, "write")),
39
+ Depends(require_entitlement("integrations_connect")),
40
+ ],
41
+ )
42
+ async def meta_oauth_start(
43
+ db: AsyncSession = Depends(get_db),
44
+ workspace: Workspace = Depends(deps.get_active_workspace),
45
+ current_user: User = Depends(deps.get_current_user),
46
+ ):
47
+ """Build Meta authorization URL and return it."""
48
+ if not settings.META_CLIENT_ID or not settings.META_CLIENT_SECRET:
49
+ return wrap_error("META_CLIENT_ID and META_CLIENT_SECRET must be configured")
50
+
51
+ redirect_uri = settings.META_OAUTH_REDIRECT_URI
52
+ if not redirect_uri:
53
+ base = settings.APP_BASE_URL or settings.FRONTEND_URL
54
+ redirect_uri = f"{base}/api/v1/integrations/meta/oauth/callback"
55
+
56
+ # Encode workspace + user into JWT state param (10-min expiry)
57
+ state_payload = {
58
+ "sub": str(current_user.id),
59
+ "workspace_id": str(workspace.id),
60
+ "exp": datetime.utcnow() + timedelta(minutes=10),
61
+ "purpose": "meta_oauth",
62
+ }
63
+ state_token = jwt.encode(
64
+ state_payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM
65
+ )
66
+
67
+ params = {
68
+ "client_id": settings.META_CLIENT_ID,
69
+ "redirect_uri": redirect_uri,
70
+ "scope": META_SCOPES,
71
+ "state": state_token,
72
+ "response_type": "code",
73
+ }
74
+ auth_url = f"https://www.facebook.com/{GRAPH_API_VERSION}/dialog/oauth?{urlencode(params)}"
75
+
76
+ metrics.increment("meta_oauth_started")
77
+ await log_event(
78
+ db, event_type="meta.oauth.started", source="meta_oauth",
79
+ workspace_id=workspace.id,
80
+ )
81
+ await db.commit()
82
+
83
+ return wrap_data({"auth_url": auth_url})
84
+
85
+
86
+ @router.get("/callback")
87
+ async def meta_oauth_callback(
88
+ request: Request,
89
+ db: AsyncSession = Depends(get_db),
90
+ ):
91
+ """Exchange authorization code for token, subscribe webhooks, store integration."""
92
+ code = request.query_params.get("code")
93
+ state = request.query_params.get("state")
94
+ error = request.query_params.get("error")
95
+ frontend_base = settings.FRONTEND_URL
96
+
97
+ if error or not code or not state:
98
+ metrics.increment("meta_oauth_failed")
99
+ return RedirectResponse(
100
+ f"{frontend_base}/settings/integrations?oauth=meta&status=error&reason={error or 'missing_params'}"
101
+ )
102
+
103
+ # Decode state JWT
104
+ try:
105
+ state_data = jwt.decode(
106
+ state, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]
107
+ )
108
+ if state_data.get("purpose") != "meta_oauth":
109
+ raise JWTError("Invalid state purpose")
110
+ user_id = state_data["sub"]
111
+ workspace_id = state_data["workspace_id"]
112
+ except JWTError:
113
+ metrics.increment("meta_oauth_failed")
114
+ return RedirectResponse(
115
+ f"{frontend_base}/settings/integrations?oauth=meta&status=error&reason=invalid_state"
116
+ )
117
+
118
+ redirect_uri = settings.META_OAUTH_REDIRECT_URI
119
+ if not redirect_uri:
120
+ base = settings.APP_BASE_URL or settings.FRONTEND_URL
121
+ redirect_uri = f"{base}/api/v1/integrations/meta/oauth/callback"
122
+
123
+ try:
124
+ async with httpx.AsyncClient() as client:
125
+ # 1. Exchange code for short-lived user token
126
+ token_resp = await client.get(
127
+ f"{GRAPH_BASE}/oauth/access_token",
128
+ params={
129
+ "client_id": settings.META_CLIENT_ID,
130
+ "redirect_uri": redirect_uri,
131
+ "client_secret": settings.META_CLIENT_SECRET,
132
+ "code": code,
133
+ },
134
+ )
135
+ if token_resp.status_code != 200:
136
+ raise Exception(f"Token exchange failed: {token_resp.text}")
137
+ short_token = token_resp.json().get("access_token")
138
+
139
+ # 2. Exchange for long-lived token
140
+ ll_resp = await client.get(
141
+ f"{GRAPH_BASE}/oauth/access_token",
142
+ params={
143
+ "grant_type": "fb_exchange_token",
144
+ "client_id": settings.META_CLIENT_ID,
145
+ "client_secret": settings.META_CLIENT_SECRET,
146
+ "fb_exchange_token": short_token,
147
+ },
148
+ )
149
+ if ll_resp.status_code != 200:
150
+ raise Exception(f"Long-lived token exchange failed: {ll_resp.text}")
151
+ long_lived_user_token = ll_resp.json().get("access_token")
152
+
153
+ # 3. Fetch pages
154
+ pages_resp = await client.get(
155
+ f"{GRAPH_BASE}/me/accounts",
156
+ params={"access_token": long_lived_user_token},
157
+ )
158
+ if pages_resp.status_code != 200:
159
+ raise Exception(f"Page fetch failed: {pages_resp.text}")
160
+ pages = pages_resp.json().get("data", [])
161
+ if not pages:
162
+ raise Exception("No pages found for this user")
163
+
164
+ page = pages[0]
165
+ page_id = page["id"]
166
+ page_access_token = page["access_token"]
167
+ page_name = page.get("name", "")
168
+
169
+ # 4. Fetch Instagram Business Account (optional)
170
+ ig_account_id = None
171
+ try:
172
+ ig_resp = await client.get(
173
+ f"{GRAPH_BASE}/{page_id}",
174
+ params={
175
+ "fields": "instagram_business_account",
176
+ "access_token": page_access_token,
177
+ },
178
+ )
179
+ if ig_resp.status_code == 200:
180
+ ig_data = ig_resp.json().get("instagram_business_account", {})
181
+ ig_account_id = ig_data.get("id")
182
+ except Exception:
183
+ logger.warning("Could not fetch Instagram Business Account ID")
184
+
185
+ # 5. Subscribe webhooks
186
+ await subscribe_page_webhooks(page_id, page_access_token)
187
+
188
+ # 6. Store Integration
189
+ config_data = {
190
+ "access_token": page_access_token,
191
+ "page_name": page_name,
192
+ "instagram_business_account_id": ig_account_id,
193
+ }
194
+ encrypted_config = encrypt_data(config_data)
195
+
196
+ from uuid import UUID
197
+ ws_id = UUID(workspace_id)
198
+
199
+ result = await db.execute(
200
+ select(Integration).where(
201
+ Integration.workspace_id == ws_id,
202
+ Integration.provider == "meta",
203
+ )
204
+ )
205
+ integration = result.scalars().first()
206
+
207
+ if not integration:
208
+ integration = Integration(workspace_id=ws_id, provider="meta")
209
+ db.add(integration)
210
+
211
+ integration.status = IntegrationStatus.CONNECTED
212
+ integration.encrypted_config = encrypted_config
213
+ integration.provider_workspace_id = page_id
214
+ integration.connected_at = datetime.utcnow()
215
+ integration.last_checked_at = datetime.utcnow()
216
+ integration.last_error = None
217
+
218
+ try:
219
+ await db.commit()
220
+ except IntegrityError:
221
+ await db.rollback()
222
+ metrics.increment("meta_oauth_failed")
223
+ return RedirectResponse(
224
+ f"{frontend_base}/settings/integrations?oauth=meta&status=error&reason=page_already_linked"
225
+ )
226
+
227
+ await audit_event(
228
+ db, action="integration_connect", entity_type="integration",
229
+ entity_id=str(integration.id), actor_user_id=UUID(user_id),
230
+ outcome="success", workspace_id=ws_id, request=request,
231
+ metadata={"provider": "meta", "method": "oauth", "page_name": page_name},
232
+ )
233
+ await db.commit()
234
+
235
+ metrics.increment("meta_oauth_completed")
236
+ await log_event(
237
+ db, event_type="meta.oauth.completed", source="meta_oauth",
238
+ workspace_id=ws_id,
239
+ payload={"page_id": page_id, "page_name": page_name},
240
+ )
241
+ await db.commit()
242
+
243
+ return RedirectResponse(
244
+ f"{frontend_base}/settings/integrations?oauth=meta&status=success"
245
+ )
246
+
247
+ except Exception as e:
248
+ logger.exception(f"Meta OAuth callback error: {e}")
249
+ metrics.increment("meta_oauth_failed")
250
+ try:
251
+ await log_event(
252
+ db, event_type="meta.oauth.failed", source="meta_oauth",
253
+ outcome="failure", error_message=str(e),
254
+ )
255
+ await db.commit()
256
+ except Exception:
257
+ pass
258
+ return RedirectResponse(
259
+ f"{frontend_base}/settings/integrations?oauth=meta&status=error&reason=callback_error"
260
+ )
backend/app/api/v1/webhooks.py CHANGED
@@ -224,9 +224,12 @@ async def meta_webhook(
224
  except (IndexError, AttributeError):
225
  pass
226
 
227
- metrics.increment("webhooks_received", labels={"provider": "meta"})
 
 
 
228
  await log_event(db, event_type="webhook.received", source="webhook",
229
- payload={"provider": "meta", "page_id": page_id})
230
 
231
  # 3. Resolve Workspace
232
  workspace_id = await resolve_workspace(db, "meta", page_id) if page_id else None
 
224
  except (IndexError, AttributeError):
225
  pass
226
 
227
+ # Channel discrimination for metrics
228
+ object_type = payload.get("object", "")
229
+ channel = "instagram" if object_type == "instagram" else "messenger"
230
+ metrics.increment("webhooks_received", labels={"provider": "meta", "channel": channel})
231
  await log_event(db, event_type="webhook.received", source="webhook",
232
+ payload={"provider": "meta", "page_id": page_id, "channel": channel})
233
 
234
  # 3. Resolve Workspace
235
  workspace_id = await resolve_workspace(db, "meta", page_id) if page_id else None
backend/app/core/config.py CHANGED
@@ -36,6 +36,7 @@ class Settings(BaseSettings):
36
  META_CLIENT_ID: Optional[str] = None
37
  META_CLIENT_SECRET: Optional[str] = None
38
  META_APP_SECRET: Optional[str] = None
 
39
  WHATSAPP_VERIFY_TOKEN: Optional[str] = None
40
  ZOHO_CLIENT_ID: Optional[str] = None
41
  ZOHO_CLIENT_SECRET: Optional[str] = None
 
36
  META_CLIENT_ID: Optional[str] = None
37
  META_CLIENT_SECRET: Optional[str] = None
38
  META_APP_SECRET: Optional[str] = None
39
+ META_OAUTH_REDIRECT_URI: Optional[str] = None
40
  WHATSAPP_VERIFY_TOKEN: Optional[str] = None
41
  ZOHO_CLIENT_ID: Optional[str] = None
42
  ZOHO_CLIENT_SECRET: Optional[str] = None
backend/app/integrations/meta/adapter.py CHANGED
@@ -64,5 +64,61 @@ class MetaAdapter(MessagingAdapter):
64
  async def send_media(
65
  self, to: str, media_type: str, media_url: str, caption: Optional[str] = None
66
  ) -> str:
67
- """Media sending not yet implemented for Meta."""
68
- raise NotImplementedError("Meta media sending not yet supported")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
  async def send_media(
65
  self, to: str, media_type: str, media_url: str, caption: Optional[str] = None
66
  ) -> str:
67
+ """Send a media message via Meta Graph API (Messenger/Instagram)."""
68
+ headers = {"Content-Type": "application/json"}
69
+ params = {"access_token": self.page_access_token}
70
+ payload = {
71
+ "recipient": {"id": to},
72
+ "message": {
73
+ "attachment": {
74
+ "type": media_type,
75
+ "payload": {"url": media_url, "is_reusable": True},
76
+ }
77
+ },
78
+ "messaging_type": "RESPONSE",
79
+ }
80
+
81
+ async with httpx.AsyncClient() as client:
82
+ try:
83
+ response = await client.post(
84
+ self.base_url, headers=headers, params=params, json=payload
85
+ )
86
+ data = response.json()
87
+
88
+ if response.status_code == 200:
89
+ provider_message_id = data.get("message_id")
90
+ logger.info(f"Meta media sent: {provider_message_id}")
91
+ return provider_message_id
92
+
93
+ error = data.get("error", {})
94
+ error_code = error.get("code")
95
+ error_message = error.get("message", "Unknown error")
96
+ logger.error(f"Meta API media error: {error_message} (Code: {error_code})")
97
+
98
+ if response.status_code in [400, 401, 403]:
99
+ raise Exception(f"PERMANENT_FAILURE: {error_message} (Code: {error_code})")
100
+ if response.status_code in [429, 500, 502, 503, 504]:
101
+ raise Exception(f"TRANSIENT_FAILURE: {error_message} (Code: {error_code})")
102
+ raise Exception(f"UNCLASSIFIED_FAILURE: {error_message} (Status: {response.status_code})")
103
+
104
+ except httpx.RequestError as e:
105
+ logger.error(f"Meta Network error (media): {str(e)}")
106
+ raise Exception(f"TRANSIENT_FAILURE: Network error: {str(e)}")
107
+
108
+ async def get_user_profile(self, user_id: str) -> dict:
109
+ """Fetch user name and profile pic from Meta Graph API. Non-critical."""
110
+ url = f"https://graph.facebook.com/v18.0/{user_id}"
111
+ params = {
112
+ "fields": "first_name,last_name,profile_pic",
113
+ "access_token": self.page_access_token,
114
+ }
115
+ async with httpx.AsyncClient() as client:
116
+ try:
117
+ response = await client.get(url, params=params)
118
+ if response.status_code == 200:
119
+ return response.json()
120
+ logger.warning(f"Failed to fetch profile for {user_id}: {response.status_code}")
121
+ return {}
122
+ except httpx.RequestError as e:
123
+ logger.warning(f"Network error fetching profile: {e}")
124
+ return {}
backend/app/models/models.py CHANGED
@@ -268,6 +268,7 @@ class ChannelIdentity(BaseIDModel, table=True):
268
  contact_id: UUID = Field(foreign_key="contact.id")
269
  provider: str # whatsapp, meta, etc
270
  provider_user_id: str # phone number or fb id
 
271
 
272
  class Conversation(WorkspaceScopedModel, table=True):
273
  __table_args__ = (
 
268
  contact_id: UUID = Field(foreign_key="contact.id")
269
  provider: str # whatsapp, meta, etc
270
  provider_user_id: str # phone number or fb id
271
+ channel: Optional[str] = Field(default=None) # instagram, messenger (sub-channel of meta)
272
 
273
  class Conversation(WorkspaceScopedModel, table=True):
274
  __table_args__ = (
backend/app/services/dispatch_service.py CHANGED
@@ -227,7 +227,12 @@ class DispatchService:
227
  page_id=integration.provider_workspace_id,
228
  page_access_token=config.get("access_token")
229
  )
230
- provider_message_id = await adapter.send_text(recipient_id, msg.content)
 
 
 
 
 
231
  else:
232
  raise Exception(f"Unsupported platform: {msg.platform}")
233
 
 
227
  page_id=integration.provider_workspace_id,
228
  page_access_token=config.get("access_token")
229
  )
230
+ if media_url and media_type:
231
+ provider_message_id = await adapter.send_media(
232
+ recipient_id, media_type, media_url, caption=media_caption
233
+ )
234
+ else:
235
+ provider_message_id = await adapter.send_text(recipient_id, msg.content)
236
  else:
237
  raise Exception(f"Unsupported platform: {msg.platform}")
238
 
backend/app/services/meta_service.py ADDED
@@ -0,0 +1,111 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Meta Graph API service — centralized helpers for OAuth, webhook subscription, Lead Ads."""
2
+ import httpx
3
+ import logging
4
+
5
+ from app.services.metrics_service import metrics
6
+
7
+ logger = logging.getLogger(__name__)
8
+
9
+ GRAPH_API_VERSION = "v18.0"
10
+ GRAPH_BASE_URL = f"https://graph.facebook.com/{GRAPH_API_VERSION}"
11
+
12
+
13
+ async def subscribe_page_webhooks(page_id: str, page_access_token: str) -> bool:
14
+ """Subscribe a page to webhook fields for messaging + lead ads."""
15
+ url = f"{GRAPH_BASE_URL}/{page_id}/subscribed_apps"
16
+ params = {"access_token": page_access_token}
17
+ data = {"subscribed_fields": "messages,messaging_postbacks,feed,leadgen"}
18
+
19
+ async with httpx.AsyncClient() as client:
20
+ try:
21
+ response = await client.post(url, params=params, data=data)
22
+ if response.status_code == 200 and response.json().get("success"):
23
+ metrics.increment("meta_webhook_subscribed")
24
+ logger.info(f"Subscribed page {page_id} to webhooks")
25
+ return True
26
+ logger.error(f"Webhook subscription failed for {page_id}: {response.text}")
27
+ return False
28
+ except httpx.RequestError as e:
29
+ logger.error(f"Network error subscribing webhooks for {page_id}: {e}")
30
+ return False
31
+
32
+
33
+ async def verify_page_subscription(page_id: str, page_access_token: str) -> dict:
34
+ """Check current webhook subscriptions for a page."""
35
+ url = f"{GRAPH_BASE_URL}/{page_id}/subscribed_apps"
36
+ params = {"access_token": page_access_token}
37
+
38
+ async with httpx.AsyncClient() as client:
39
+ try:
40
+ response = await client.get(url, params=params)
41
+ if response.status_code == 200:
42
+ return response.json()
43
+ return {"error": response.text}
44
+ except httpx.RequestError as e:
45
+ return {"error": str(e)}
46
+
47
+
48
+ async def fetch_lead_data(leadgen_id: str, page_access_token: str) -> dict:
49
+ """Fetch lead submission data from Meta Graph API."""
50
+ url = f"{GRAPH_BASE_URL}/{leadgen_id}"
51
+ params = {"access_token": page_access_token}
52
+
53
+ async with httpx.AsyncClient() as client:
54
+ try:
55
+ response = await client.get(url, params=params)
56
+ if response.status_code == 200:
57
+ return response.json()
58
+ error_msg = response.text
59
+ logger.error(f"Failed to fetch lead {leadgen_id}: {error_msg}")
60
+ metrics.increment("meta_leads_failed")
61
+ raise Exception(f"Failed to fetch lead data: {error_msg}")
62
+ except httpx.RequestError as e:
63
+ logger.error(f"Network error fetching lead {leadgen_id}: {e}")
64
+ metrics.increment("meta_leads_failed")
65
+ raise Exception(f"Network error fetching lead: {e}")
66
+
67
+
68
+ def parse_lead_fields(field_data: list) -> dict:
69
+ """Normalize Meta lead field_data array into a flat dict."""
70
+ result = {}
71
+ field_map = {
72
+ "email": "email",
73
+ "phone_number": "phone",
74
+ "full_name": "full_name",
75
+ "first_name": "first_name",
76
+ "last_name": "last_name",
77
+ }
78
+
79
+ for field in field_data:
80
+ name = field.get("name", "").lower()
81
+ values = field.get("values", [])
82
+ if name in field_map and values:
83
+ result[field_map[name]] = values[0]
84
+
85
+ # Split full_name into first/last if separate fields missing
86
+ if "full_name" in result and "first_name" not in result:
87
+ parts = result["full_name"].split(" ", 1)
88
+ result["first_name"] = parts[0]
89
+ result["last_name"] = parts[1] if len(parts) > 1 else ""
90
+
91
+ return result
92
+
93
+
94
+ async def get_user_profile(user_id: str, page_access_token: str) -> dict:
95
+ """Fetch user profile from Meta Graph API. Non-critical — returns {} on failure."""
96
+ url = f"{GRAPH_BASE_URL}/{user_id}"
97
+ params = {
98
+ "fields": "first_name,last_name,profile_pic",
99
+ "access_token": page_access_token,
100
+ }
101
+
102
+ async with httpx.AsyncClient() as client:
103
+ try:
104
+ response = await client.get(url, params=params)
105
+ if response.status_code == 200:
106
+ return response.json()
107
+ logger.warning(f"Failed to fetch profile for {user_id}: {response.status_code}")
108
+ return {}
109
+ except httpx.RequestError as e:
110
+ logger.warning(f"Network error fetching profile for {user_id}: {e}")
111
+ return {}
backend/app/workers/tasks.py CHANGED
@@ -42,6 +42,11 @@ def parse_webhook_payload(provider: str, payload: Dict) -> Dict:
42
  "media_type": None,
43
  "mime_type": None,
44
  "caption": None,
 
 
 
 
 
45
  }
46
 
47
  try:
@@ -77,18 +82,50 @@ def parse_webhook_payload(provider: str, payload: Dict) -> Dict:
77
 
78
  elif provider == "meta":
79
  entry = payload.get("entry", [])[0]
 
 
 
 
 
 
 
 
80
  # Messaging (Messenger/Instagram)
81
  if "messaging" in entry:
82
  msg_event = entry["messaging"][0]
83
- normalized["trigger_type"] = "MESSAGE_INBOUND"
84
- normalized["provider_user_id"] = msg_event.get("sender", {}).get("id")
85
- normalized["content"] = msg_event.get("message", {}).get("text")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86
  # Lead Ads
87
  elif "changes" in entry:
88
  change = entry["changes"][0]
89
  if change.get("field") == "leadgen":
90
  normalized["trigger_type"] = "LEAD_AD_SUBMIT"
91
- normalized["provider_user_id"] = change.get("value", {}).get("leadgen_id")
 
 
92
  normalized["content"] = "Lead Ad Submission"
93
  except (IndexError, KeyError, AttributeError):
94
  pass
@@ -163,6 +200,64 @@ async def _process_status_updates(session, statuses: list, workspace_id, correla
163
  related_ids={"message_id": str(msg.id)},
164
  payload={"status": wa_status, "provider_message_id": provider_msg_id})
165
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
166
  @celery_app.task(name="app.workers.tasks.process_webhook_event")
167
  def process_webhook_event(event_id: str):
168
  """
@@ -218,6 +313,18 @@ def process_webhook_event(event_id: str):
218
  await session.commit()
219
  return
220
 
 
 
 
 
 
 
 
 
 
 
 
 
221
  if not info["trigger_type"]:
222
  logger.info(f"No actionable trigger found in payload for event {event_id}")
223
  event.status = WebhookStatus.PROCESSED
@@ -240,13 +347,16 @@ def process_webhook_event(event_id: str):
240
 
241
  if info["trigger_type"] == "MESSAGE_INBOUND":
242
  msg_metadata = {}
 
 
 
 
 
243
  if info.get("media_id"):
244
- msg_metadata = {
245
- "media_id": info["media_id"],
246
- "media_type": info["media_type"],
247
- "mime_type": info["mime_type"],
248
- }
249
- # Download and store media
250
  try:
251
  from app.services.media_service import download_and_store_media
252
  media_path = await download_and_store_media(
@@ -269,6 +379,73 @@ def process_webhook_event(event_id: str):
269
  )
270
  session.add(inbound_msg)
271
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
272
  # 3. Find Matching Published Flow (prefer published_version_id pointer)
273
  flow_version = None
274
  flow_query = select(Flow).where(
 
42
  "media_type": None,
43
  "mime_type": None,
44
  "caption": None,
45
+ "channel": None, # instagram | messenger | lead_ads
46
+ "attachment_url": None,
47
+ "leadgen_id": None,
48
+ "meta_delivery_watermark": None,
49
+ "meta_read_watermark": None,
50
  }
51
 
52
  try:
 
82
 
83
  elif provider == "meta":
84
  entry = payload.get("entry", [])[0]
85
+
86
+ # Channel discrimination: instagram vs messenger
87
+ object_type = payload.get("object", "")
88
+ if object_type == "instagram":
89
+ normalized["channel"] = "instagram"
90
+ else:
91
+ normalized["channel"] = "messenger"
92
+
93
  # Messaging (Messenger/Instagram)
94
  if "messaging" in entry:
95
  msg_event = entry["messaging"][0]
96
+
97
+ # Delivery receipts (watermark-based)
98
+ if "delivery" in msg_event:
99
+ normalized["trigger_type"] = "META_STATUS_UPDATE"
100
+ normalized["meta_delivery_watermark"] = msg_event["delivery"].get("watermark")
101
+ elif "read" in msg_event:
102
+ normalized["trigger_type"] = "META_STATUS_UPDATE"
103
+ normalized["meta_read_watermark"] = msg_event["read"].get("watermark")
104
+ elif msg_event.get("message"):
105
+ # Standard inbound message
106
+ normalized["trigger_type"] = "MESSAGE_INBOUND"
107
+ normalized["provider_user_id"] = msg_event.get("sender", {}).get("id")
108
+ message = msg_event.get("message", {})
109
+ normalized["content"] = message.get("text")
110
+
111
+ # Attachment parsing
112
+ attachments = message.get("attachments", [])
113
+ if attachments:
114
+ att = attachments[0]
115
+ att_type = att.get("type")
116
+ normalized["media_type"] = att_type
117
+ normalized["attachment_url"] = att.get("payload", {}).get("url")
118
+ if not normalized["content"]:
119
+ normalized["content"] = f"[{att_type}]"
120
+
121
  # Lead Ads
122
  elif "changes" in entry:
123
  change = entry["changes"][0]
124
  if change.get("field") == "leadgen":
125
  normalized["trigger_type"] = "LEAD_AD_SUBMIT"
126
+ normalized["leadgen_id"] = change.get("value", {}).get("leadgen_id")
127
+ normalized["provider_user_id"] = normalized["leadgen_id"]
128
+ normalized["channel"] = "lead_ads"
129
  normalized["content"] = "Lead Ad Submission"
130
  except (IndexError, KeyError, AttributeError):
131
  pass
 
200
  related_ids={"message_id": str(msg.id)},
201
  payload={"status": wa_status, "provider_message_id": provider_msg_id})
202
 
203
+
204
+ async def _process_meta_status_updates(session, info: dict, workspace_id, correlation_id: str):
205
+ """Process Meta delivery/read status webhooks using watermark timestamps."""
206
+ from app.services.runtime_event_service import log_event
207
+ from sqlmodel import or_
208
+
209
+ delivery_watermark = info.get("meta_delivery_watermark")
210
+ read_watermark = info.get("meta_read_watermark")
211
+
212
+ if delivery_watermark:
213
+ watermark_ts = datetime.utcfromtimestamp(int(delivery_watermark) / 1000)
214
+ result = await session.execute(
215
+ select(Message).where(
216
+ Message.workspace_id == workspace_id,
217
+ Message.platform == "meta",
218
+ Message.direction == "outbound",
219
+ Message.delivery_status == DeliveryStatus.SENT,
220
+ Message.sent_at < watermark_ts,
221
+ )
222
+ )
223
+ for msg in result.scalars().all():
224
+ old_priority = _STATUS_PRIORITY.get(msg.delivery_status, 0)
225
+ new_priority = _STATUS_PRIORITY.get(DeliveryStatus.DELIVERED, 0)
226
+ if new_priority > old_priority:
227
+ msg.delivery_status = DeliveryStatus.DELIVERED
228
+ msg.delivered_at = watermark_ts
229
+ session.add(msg)
230
+
231
+ if read_watermark:
232
+ watermark_ts = datetime.utcfromtimestamp(int(read_watermark) / 1000)
233
+ result = await session.execute(
234
+ select(Message).where(
235
+ Message.workspace_id == workspace_id,
236
+ Message.platform == "meta",
237
+ Message.direction == "outbound",
238
+ or_(
239
+ Message.delivery_status == DeliveryStatus.SENT,
240
+ Message.delivery_status == DeliveryStatus.DELIVERED,
241
+ ),
242
+ Message.sent_at < watermark_ts,
243
+ )
244
+ )
245
+ for msg in result.scalars().all():
246
+ old_priority = _STATUS_PRIORITY.get(msg.delivery_status, 0)
247
+ new_priority = _STATUS_PRIORITY.get(DeliveryStatus.READ, 0)
248
+ if new_priority > old_priority:
249
+ msg.delivery_status = DeliveryStatus.READ
250
+ msg.read_at = watermark_ts
251
+ if not msg.delivered_at:
252
+ msg.delivered_at = watermark_ts
253
+ session.add(msg)
254
+
255
+ await log_event(session, event_type="message.meta_status_updated", source="webhook",
256
+ workspace_id=workspace_id, correlation_id=correlation_id,
257
+ payload={"delivery_watermark": delivery_watermark,
258
+ "read_watermark": read_watermark})
259
+
260
+
261
  @celery_app.task(name="app.workers.tasks.process_webhook_event")
262
  def process_webhook_event(event_id: str):
263
  """
 
313
  await session.commit()
314
  return
315
 
316
+ # 1a2. Handle Meta delivery/read status updates (watermark-based)
317
+ if info.get("trigger_type") == "META_STATUS_UPDATE":
318
+ await _process_meta_status_updates(
319
+ session, info, event.workspace_id,
320
+ str(event.correlation_id)
321
+ )
322
+ event.status = WebhookStatus.PROCESSED
323
+ event.processed_at = datetime.utcnow()
324
+ session.add(event)
325
+ await session.commit()
326
+ return
327
+
328
  if not info["trigger_type"]:
329
  logger.info(f"No actionable trigger found in payload for event {event_id}")
330
  event.status = WebhookStatus.PROCESSED
 
347
 
348
  if info["trigger_type"] == "MESSAGE_INBOUND":
349
  msg_metadata = {}
350
+ if info.get("channel"):
351
+ msg_metadata["channel"] = info["channel"]
352
+ if info.get("attachment_url"):
353
+ msg_metadata["attachment_url"] = info["attachment_url"]
354
+ msg_metadata["media_type"] = info.get("media_type")
355
  if info.get("media_id"):
356
+ msg_metadata["media_id"] = info["media_id"]
357
+ msg_metadata["media_type"] = info["media_type"]
358
+ msg_metadata["mime_type"] = info["mime_type"]
359
+ # Download and store media (WhatsApp)
 
 
360
  try:
361
  from app.services.media_service import download_and_store_media
362
  media_path = await download_and_store_media(
 
379
  )
380
  session.add(inbound_msg)
381
 
382
+ # 2b. Lead Ads: fetch lead data and enrich contact
383
+ if info["trigger_type"] == "LEAD_AD_SUBMIT" and info.get("leadgen_id"):
384
+ from app.services.meta_service import fetch_lead_data, parse_lead_fields
385
+ from app.models.models import Integration
386
+ from app.core.security import decrypt_data
387
+ from app.services.metrics_service import metrics
388
+
389
+ integration_result = await session.execute(
390
+ select(Integration).where(
391
+ Integration.workspace_id == event.workspace_id,
392
+ Integration.provider == "meta"
393
+ )
394
+ )
395
+ integration = integration_result.scalars().first()
396
+ if integration and integration.encrypted_config:
397
+ try:
398
+ config = decrypt_data(integration.encrypted_config)
399
+ lead_raw = await fetch_lead_data(
400
+ info["leadgen_id"], config.get("access_token")
401
+ )
402
+ lead_fields = parse_lead_fields(
403
+ lead_raw.get("field_data", [])
404
+ )
405
+
406
+ # Enrich contact metadata
407
+ updated_meta = dict(contact.additional_metadata or {})
408
+ if lead_fields.get("email"):
409
+ updated_meta["email"] = lead_fields["email"]
410
+ if lead_fields.get("phone"):
411
+ updated_meta["phone"] = lead_fields["phone"]
412
+ updated_meta["lead_source"] = "meta_lead_ad"
413
+ updated_meta["leadgen_id"] = info["leadgen_id"]
414
+ contact.additional_metadata = updated_meta
415
+
416
+ if lead_fields.get("first_name") and not contact.first_name:
417
+ contact.first_name = lead_fields["first_name"]
418
+ if lead_fields.get("last_name") and not contact.last_name:
419
+ contact.last_name = lead_fields["last_name"]
420
+ session.add(contact)
421
+
422
+ # Store lead as message
423
+ lead_msg = Message(
424
+ workspace_id=event.workspace_id,
425
+ conversation_id=conversation.id,
426
+ direction="inbound",
427
+ content=f"Lead Ad: {lead_fields.get('full_name', lead_fields.get('first_name', 'Unknown'))}",
428
+ platform="meta",
429
+ delivery_status="delivered",
430
+ additional_metadata={
431
+ "type": "lead_ad_submission",
432
+ "leadgen_id": info["leadgen_id"],
433
+ "lead_fields": lead_fields,
434
+ "channel": "lead_ads",
435
+ },
436
+ )
437
+ session.add(lead_msg)
438
+ metrics.increment("meta_leads_ingested")
439
+ await log_event(
440
+ session, event_type="meta.lead_ingested",
441
+ source="webhook",
442
+ workspace_id=event.workspace_id,
443
+ correlation_id=str(event.correlation_id),
444
+ payload={"leadgen_id": info["leadgen_id"]},
445
+ )
446
+ except Exception as lead_err:
447
+ logger.warning(f"Lead data fetch failed for {info['leadgen_id']}: {lead_err}")
448
+
449
  # 3. Find Matching Published Flow (prefer published_version_id pointer)
450
  flow_version = None
451
  flow_query = select(Flow).where(
backend/main.py CHANGED
@@ -21,6 +21,7 @@ from app.api.v1.qualification import router as qualification_router
21
  from app.api.v1.entitlements import router as entitlements_router
22
  from app.api.v1.settings_profile import router as settings_profile_router
23
  from app.api.v1.qualification_criteria import router as qual_criteria_router
 
24
  from fastapi import HTTPException
25
  import uuid
26
  import logging
@@ -167,6 +168,7 @@ app.include_router(qualification_router, prefix=f"{settings.API_V1_STR}/qualific
167
  app.include_router(entitlements_router, prefix=f"{settings.API_V1_STR}/entitlements", tags=["entitlements"])
168
  app.include_router(settings_profile_router, prefix=f"{settings.API_V1_STR}/settings", tags=["settings-profile"])
169
  app.include_router(qual_criteria_router, prefix=f"{settings.API_V1_STR}/prompt-studio", tags=["prompt-studio"])
 
170
 
171
  @app.get("/")
172
  async def root():
 
21
  from app.api.v1.entitlements import router as entitlements_router
22
  from app.api.v1.settings_profile import router as settings_profile_router
23
  from app.api.v1.qualification_criteria import router as qual_criteria_router
24
+ from app.api.v1.meta_oauth import router as meta_oauth_router
25
  from fastapi import HTTPException
26
  import uuid
27
  import logging
 
168
  app.include_router(entitlements_router, prefix=f"{settings.API_V1_STR}/entitlements", tags=["entitlements"])
169
  app.include_router(settings_profile_router, prefix=f"{settings.API_V1_STR}/settings", tags=["settings-profile"])
170
  app.include_router(qual_criteria_router, prefix=f"{settings.API_V1_STR}/prompt-studio", tags=["prompt-studio"])
171
+ app.include_router(meta_oauth_router, prefix=f"{settings.API_V1_STR}/integrations/meta/oauth", tags=["integrations"])
172
 
173
  @app.get("/")
174
  async def root():
backend/tests/test_meta_platform.py ADDED
@@ -0,0 +1,644 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Mission 31 — Meta Platform: Instagram DM, Messenger, and Lead Ads Tests.
3
+ Covers: OAuth flow, webhook subscription, message parsing (Instagram/Messenger),
4
+ delivery status watermarks, Lead Ads ingestion, MetaAdapter, and observability.
5
+ """
6
+ import json
7
+ import time
8
+ import pytest
9
+ import pytest_asyncio
10
+ from datetime import datetime, timedelta
11
+ from uuid import uuid4
12
+ from unittest.mock import patch, AsyncMock, MagicMock
13
+
14
+ from httpx import AsyncClient
15
+ from sqlalchemy.ext.asyncio import AsyncSession
16
+
17
+ from app.models.models import (
18
+ User, Workspace, WorkspaceMember, WorkspaceRole,
19
+ Message, DeliveryStatus, Integration, IntegrationStatus,
20
+ Contact, ChannelIdentity, Conversation,
21
+ )
22
+ from app.core.security import get_password_hash, create_access_token, encrypt_data
23
+ from app.workers.tasks import parse_webhook_payload, _process_meta_status_updates
24
+ from app.services.meta_service import parse_lead_fields, subscribe_page_webhooks
25
+ from app.services.metrics_service import MetricsService
26
+
27
+
28
+ # ── Helpers ──────────────────────────────────────────────────────────────
29
+
30
+
31
+ async def _setup_workspace(db: AsyncSession, provider="meta"):
32
+ """Create a user + workspace + integration for testing."""
33
+ user = User(
34
+ email=f"meta_test_{uuid4().hex[:6]}@test.com",
35
+ hashed_password=get_password_hash("testpass123"),
36
+ full_name="Meta Test User",
37
+ is_active=True,
38
+ is_superuser=False,
39
+ email_verified_at=datetime.utcnow(),
40
+ )
41
+ db.add(user)
42
+ await db.flush()
43
+
44
+ ws = Workspace(name="Meta Test Workspace")
45
+ db.add(ws)
46
+ await db.flush()
47
+
48
+ mem = WorkspaceMember(user_id=user.id, workspace_id=ws.id, role=WorkspaceRole.OWNER)
49
+ db.add(mem)
50
+
51
+ integration = Integration(
52
+ workspace_id=ws.id,
53
+ provider=provider,
54
+ status=IntegrationStatus.CONNECTED,
55
+ provider_workspace_id="PAGE_123",
56
+ encrypted_config=encrypt_data({"access_token": "mock_page_token"}),
57
+ )
58
+ db.add(integration)
59
+ await db.flush()
60
+
61
+ return user, ws, integration
62
+
63
+
64
+ def _messenger_text_payload(page_id="PAGE_123", sender_id="PSID_456", text="Hello from Messenger"):
65
+ """Build a Messenger webhook payload."""
66
+ return {
67
+ "object": "page",
68
+ "entry": [{
69
+ "id": page_id,
70
+ "time": int(datetime.utcnow().timestamp() * 1000),
71
+ "messaging": [{
72
+ "sender": {"id": sender_id},
73
+ "recipient": {"id": page_id},
74
+ "timestamp": int(datetime.utcnow().timestamp() * 1000),
75
+ "message": {
76
+ "mid": f"mid.{uuid4().hex[:12]}",
77
+ "text": text,
78
+ },
79
+ }],
80
+ }],
81
+ }
82
+
83
+
84
+ def _instagram_dm_payload(ig_id="IG_PAGE_123", sender_id="IGSID_789", text="Hello from Instagram"):
85
+ """Build an Instagram DM webhook payload."""
86
+ return {
87
+ "object": "instagram",
88
+ "entry": [{
89
+ "id": ig_id,
90
+ "time": int(datetime.utcnow().timestamp() * 1000),
91
+ "messaging": [{
92
+ "sender": {"id": sender_id},
93
+ "recipient": {"id": ig_id},
94
+ "timestamp": int(datetime.utcnow().timestamp() * 1000),
95
+ "message": {
96
+ "mid": f"mid.{uuid4().hex[:12]}",
97
+ "text": text,
98
+ },
99
+ }],
100
+ }],
101
+ }
102
+
103
+
104
+ def _instagram_attachment_payload(ig_id="IG_PAGE_123", sender_id="IGSID_789"):
105
+ """Build an Instagram DM payload with an image attachment."""
106
+ return {
107
+ "object": "instagram",
108
+ "entry": [{
109
+ "id": ig_id,
110
+ "time": int(datetime.utcnow().timestamp() * 1000),
111
+ "messaging": [{
112
+ "sender": {"id": sender_id},
113
+ "recipient": {"id": ig_id},
114
+ "timestamp": int(datetime.utcnow().timestamp() * 1000),
115
+ "message": {
116
+ "mid": f"mid.{uuid4().hex[:12]}",
117
+ "attachments": [{
118
+ "type": "image",
119
+ "payload": {"url": "https://example.com/photo.jpg"},
120
+ }],
121
+ },
122
+ }],
123
+ }],
124
+ }
125
+
126
+
127
+ def _messenger_echo_payload(page_id="PAGE_123"):
128
+ """Build a Messenger echo payload (sent by the page itself)."""
129
+ return {
130
+ "object": "page",
131
+ "entry": [{
132
+ "id": page_id,
133
+ "time": int(datetime.utcnow().timestamp() * 1000),
134
+ "messaging": [{
135
+ "sender": {"id": page_id},
136
+ "recipient": {"id": "PSID_456"},
137
+ "timestamp": int(datetime.utcnow().timestamp() * 1000),
138
+ "message": {
139
+ "mid": f"mid.{uuid4().hex[:12]}",
140
+ "text": "Echo message",
141
+ "is_echo": True,
142
+ },
143
+ }],
144
+ }],
145
+ }
146
+
147
+
148
+ def _delivery_receipt_payload(page_id="PAGE_123", watermark=None):
149
+ """Build a Meta delivery receipt payload."""
150
+ if watermark is None:
151
+ watermark = int(datetime.utcnow().timestamp() * 1000)
152
+ return {
153
+ "object": "page",
154
+ "entry": [{
155
+ "id": page_id,
156
+ "time": int(datetime.utcnow().timestamp() * 1000),
157
+ "messaging": [{
158
+ "sender": {"id": page_id},
159
+ "recipient": {"id": "PSID_456"},
160
+ "timestamp": int(datetime.utcnow().timestamp() * 1000),
161
+ "delivery": {
162
+ "mids": ["mid.abc123"],
163
+ "watermark": watermark,
164
+ },
165
+ }],
166
+ }],
167
+ }
168
+
169
+
170
+ def _read_receipt_payload(page_id="PAGE_123", watermark=None):
171
+ """Build a Meta read receipt payload."""
172
+ if watermark is None:
173
+ watermark = int(datetime.utcnow().timestamp() * 1000)
174
+ return {
175
+ "object": "page",
176
+ "entry": [{
177
+ "id": page_id,
178
+ "time": int(datetime.utcnow().timestamp() * 1000),
179
+ "messaging": [{
180
+ "sender": {"id": "PSID_456"},
181
+ "recipient": {"id": page_id},
182
+ "timestamp": int(datetime.utcnow().timestamp() * 1000),
183
+ "read": {
184
+ "watermark": watermark,
185
+ },
186
+ }],
187
+ }],
188
+ }
189
+
190
+
191
+ def _leadgen_payload(page_id="PAGE_123", leadgen_id="LEAD_999"):
192
+ """Build a Lead Ads webhook payload."""
193
+ return {
194
+ "object": "page",
195
+ "entry": [{
196
+ "id": page_id,
197
+ "time": int(datetime.utcnow().timestamp() * 1000),
198
+ "changes": [{
199
+ "field": "leadgen",
200
+ "value": {
201
+ "leadgen_id": leadgen_id,
202
+ "page_id": page_id,
203
+ "form_id": "FORM_123",
204
+ "created_time": int(datetime.utcnow().timestamp()),
205
+ },
206
+ }],
207
+ }],
208
+ }
209
+
210
+
211
+ # ── Section 1: OAuth Flow ────────────────────────────────────────────────
212
+
213
+
214
+ @pytest.mark.asyncio
215
+ async def test_meta_oauth_start_returns_auth_url(async_client: AsyncClient, db_session: AsyncSession):
216
+ """GET /start returns a valid Meta authorization URL."""
217
+ user, ws, _ = await _setup_workspace(db_session)
218
+ token = create_access_token(str(user.id), workspace_id=str(ws.id))
219
+
220
+ with patch("app.api.v1.meta_oauth.settings") as mock_settings:
221
+ mock_settings.META_CLIENT_ID = "test_client_id"
222
+ mock_settings.META_CLIENT_SECRET = "test_secret"
223
+ mock_settings.META_OAUTH_REDIRECT_URI = "https://example.com/callback"
224
+ mock_settings.JWT_SECRET = "test_jwt_secret"
225
+ mock_settings.JWT_ALGORITHM = "HS256"
226
+ mock_settings.FRONTEND_URL = "http://localhost:3000"
227
+ mock_settings.APP_BASE_URL = None
228
+
229
+ resp = await async_client.get(
230
+ "/api/v1/integrations/meta/oauth/start",
231
+ headers={"Authorization": f"Bearer {token}", "X-Workspace-ID": str(ws.id)},
232
+ )
233
+
234
+ assert resp.status_code == 200
235
+ data = resp.json()
236
+ assert data["success"] is True
237
+ auth_url = data["data"]["auth_url"]
238
+ assert "facebook.com" in auth_url
239
+ assert "client_id=test_client_id" in auth_url
240
+ assert "response_type=code" in auth_url
241
+
242
+
243
+ @pytest.mark.asyncio
244
+ async def test_meta_oauth_start_no_credentials(async_client: AsyncClient, db_session: AsyncSession):
245
+ """GET /start returns error when META_CLIENT_ID is not configured."""
246
+ user, ws, _ = await _setup_workspace(db_session)
247
+ token = create_access_token(str(user.id), workspace_id=str(ws.id))
248
+
249
+ with patch("app.api.v1.meta_oauth.settings") as mock_settings:
250
+ mock_settings.META_CLIENT_ID = None
251
+ mock_settings.META_CLIENT_SECRET = None
252
+
253
+ resp = await async_client.get(
254
+ "/api/v1/integrations/meta/oauth/start",
255
+ headers={"Authorization": f"Bearer {token}", "X-Workspace-ID": str(ws.id)},
256
+ )
257
+
258
+ assert resp.status_code == 200
259
+ data = resp.json()
260
+ assert data["success"] is False
261
+ assert "configured" in data["error"].lower()
262
+
263
+
264
+ @pytest.mark.asyncio
265
+ async def test_meta_oauth_callback_success(async_client: AsyncClient, db_session: AsyncSession):
266
+ """GET /callback with valid code exchanges tokens and creates Integration."""
267
+ from jose import jwt as jose_jwt
268
+ from app.core.config import settings
269
+
270
+ user, ws, _ = await _setup_workspace(db_session, provider="whatsapp") # no meta integration yet
271
+
272
+ state = jose_jwt.encode(
273
+ {"sub": str(user.id), "workspace_id": str(ws.id),
274
+ "exp": datetime.utcnow() + timedelta(minutes=10), "purpose": "meta_oauth"},
275
+ settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM,
276
+ )
277
+
278
+ mock_responses = [
279
+ # 1. Short-lived token exchange
280
+ MagicMock(status_code=200, json=lambda: {"access_token": "short_token"}),
281
+ # 2. Long-lived token exchange
282
+ MagicMock(status_code=200, json=lambda: {"access_token": "long_lived_token"}),
283
+ # 3. Fetch pages
284
+ MagicMock(status_code=200, json=lambda: {"data": [{"id": "PAGE_NEW", "access_token": "page_token", "name": "My Page"}]}),
285
+ # 4. Fetch IG account
286
+ MagicMock(status_code=200, json=lambda: {"instagram_business_account": {"id": "IG_BIZ_123"}}),
287
+ # 5. Subscribe webhooks
288
+ MagicMock(status_code=200, json=lambda: {"success": True}),
289
+ ]
290
+
291
+ call_count = {"i": 0}
292
+ original_get = None
293
+ original_post = None
294
+
295
+ async def mock_request(self, method, url, **kwargs):
296
+ idx = call_count["i"]
297
+ call_count["i"] += 1
298
+ if idx < len(mock_responses):
299
+ return mock_responses[idx]
300
+ return MagicMock(status_code=200, json=lambda: {})
301
+
302
+ with patch("httpx.AsyncClient.get", new=lambda self, url, **kw: mock_request(self, "GET", url, **kw)):
303
+ with patch("httpx.AsyncClient.post", new=lambda self, url, **kw: mock_request(self, "POST", url, **kw)):
304
+ resp = await async_client.get(
305
+ f"/api/v1/integrations/meta/oauth/callback?code=test_code&state={state}",
306
+ )
307
+
308
+ # Should redirect to frontend
309
+ # (httpx follows redirects by default in our test client)
310
+ assert resp.status_code == 200 or resp.status_code == 307 or resp.status_code == 302
311
+
312
+
313
+ # ── Section 2: Webhook Subscription ──────────────────────────────────────
314
+
315
+
316
+ @pytest.mark.asyncio
317
+ async def test_subscribe_page_webhooks_success():
318
+ """subscribe_page_webhooks returns True on 200 + success."""
319
+ mock_resp = MagicMock(status_code=200, json=lambda: {"success": True})
320
+
321
+ with patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=mock_resp):
322
+ result = await subscribe_page_webhooks("PAGE_123", "token_abc")
323
+
324
+ assert result is True
325
+
326
+
327
+ @pytest.mark.asyncio
328
+ async def test_subscribe_page_webhooks_failure():
329
+ """subscribe_page_webhooks returns False on error."""
330
+ mock_resp = MagicMock(status_code=400, text="Bad Request", json=lambda: {"error": "bad"})
331
+
332
+ with patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=mock_resp):
333
+ result = await subscribe_page_webhooks("PAGE_123", "token_abc")
334
+
335
+ assert result is False
336
+
337
+
338
+ # ── Section 3: Message Parsing — Instagram vs Messenger ──────────────────
339
+
340
+
341
+ @pytest.mark.asyncio
342
+ async def test_parse_instagram_dm_text():
343
+ """Instagram DM text payload → channel=instagram, correct content."""
344
+ payload = _instagram_dm_payload(text="Hi from IG")
345
+ info = parse_webhook_payload("meta", payload)
346
+
347
+ assert info["trigger_type"] == "MESSAGE_INBOUND"
348
+ assert info["channel"] == "instagram"
349
+ assert info["provider_user_id"] == "IGSID_789"
350
+ assert info["content"] == "Hi from IG"
351
+
352
+
353
+ @pytest.mark.asyncio
354
+ async def test_parse_messenger_text():
355
+ """Messenger text payload → channel=messenger, correct content."""
356
+ payload = _messenger_text_payload(text="Hi from Messenger")
357
+ info = parse_webhook_payload("meta", payload)
358
+
359
+ assert info["trigger_type"] == "MESSAGE_INBOUND"
360
+ assert info["channel"] == "messenger"
361
+ assert info["provider_user_id"] == "PSID_456"
362
+ assert info["content"] == "Hi from Messenger"
363
+
364
+
365
+ @pytest.mark.asyncio
366
+ async def test_parse_instagram_attachment():
367
+ """Instagram DM with image attachment → media_type + attachment_url extracted."""
368
+ payload = _instagram_attachment_payload()
369
+ info = parse_webhook_payload("meta", payload)
370
+
371
+ assert info["trigger_type"] == "MESSAGE_INBOUND"
372
+ assert info["channel"] == "instagram"
373
+ assert info["media_type"] == "image"
374
+ assert info["attachment_url"] == "https://example.com/photo.jpg"
375
+ assert info["content"] == "[image]"
376
+
377
+
378
+ @pytest.mark.asyncio
379
+ async def test_parse_messenger_echo_detected():
380
+ """Messenger echo payload — is_echo is not handled in parse but in process_webhook_event."""
381
+ payload = _messenger_echo_payload()
382
+ # parse_webhook_payload still returns MESSAGE_INBOUND for echoes
383
+ # The echo detection happens in process_webhook_event (line 192-204)
384
+ info = parse_webhook_payload("meta", payload)
385
+ assert info["trigger_type"] == "MESSAGE_INBOUND"
386
+ assert info["channel"] == "messenger"
387
+
388
+ # Verify the echo flag is in the raw payload for process_webhook_event to detect
389
+ entry = payload["entry"][0]
390
+ assert entry["messaging"][0]["message"]["is_echo"] is True
391
+
392
+
393
+ # ── Section 4: Meta Delivery Status (Watermark) ─────────────────────────
394
+
395
+
396
+ @pytest.mark.asyncio
397
+ async def test_meta_delivery_watermark_updates(db_session: AsyncSession):
398
+ """Delivery watermark updates SENT messages to DELIVERED."""
399
+ user, ws, integration = await _setup_workspace(db_session)
400
+
401
+ # Use time.time() for proper Unix epoch (avoids utcnow().timestamp() local TZ mismatch)
402
+ now_epoch = time.time()
403
+ sent_time = datetime.utcfromtimestamp(now_epoch - 300) # 5 min ago
404
+ msg = Message(
405
+ workspace_id=ws.id,
406
+ conversation_id=uuid4(),
407
+ direction="outbound",
408
+ content="Test message",
409
+ platform="meta",
410
+ delivery_status=DeliveryStatus.SENT,
411
+ sent_at=sent_time,
412
+ )
413
+ db_session.add(msg)
414
+ await db_session.flush()
415
+
416
+ # Watermark is 1 minute ago (after sent_time)
417
+ watermark = int((now_epoch - 60) * 1000)
418
+ info = {"meta_delivery_watermark": watermark, "meta_read_watermark": None}
419
+
420
+ await _process_meta_status_updates(
421
+ db_session, info, ws.id, str(uuid4())
422
+ )
423
+ await db_session.flush()
424
+
425
+ await db_session.refresh(msg)
426
+ assert msg.delivery_status == DeliveryStatus.DELIVERED
427
+ assert msg.delivered_at is not None
428
+
429
+
430
+ @pytest.mark.asyncio
431
+ async def test_meta_read_watermark_updates(db_session: AsyncSession):
432
+ """Read watermark updates DELIVERED messages to READ."""
433
+ user, ws, integration = await _setup_workspace(db_session)
434
+
435
+ now_epoch = time.time()
436
+ sent_time = datetime.utcfromtimestamp(now_epoch - 300)
437
+ msg = Message(
438
+ workspace_id=ws.id,
439
+ conversation_id=uuid4(),
440
+ direction="outbound",
441
+ content="Test read",
442
+ platform="meta",
443
+ delivery_status=DeliveryStatus.DELIVERED,
444
+ sent_at=sent_time,
445
+ delivered_at=datetime.utcfromtimestamp(now_epoch - 240),
446
+ )
447
+ db_session.add(msg)
448
+ await db_session.flush()
449
+
450
+ watermark = int((now_epoch - 60) * 1000)
451
+ info = {"meta_delivery_watermark": None, "meta_read_watermark": watermark}
452
+
453
+ await _process_meta_status_updates(
454
+ db_session, info, ws.id, str(uuid4())
455
+ )
456
+ await db_session.flush()
457
+
458
+ await db_session.refresh(msg)
459
+ assert msg.delivery_status == DeliveryStatus.READ
460
+ assert msg.read_at is not None
461
+
462
+
463
+ @pytest.mark.asyncio
464
+ async def test_meta_status_no_regression(db_session: AsyncSession):
465
+ """READ message should not regress to DELIVERED."""
466
+ user, ws, integration = await _setup_workspace(db_session)
467
+
468
+ now_epoch = time.time()
469
+ sent_time = datetime.utcfromtimestamp(now_epoch - 300)
470
+ msg = Message(
471
+ workspace_id=ws.id,
472
+ conversation_id=uuid4(),
473
+ direction="outbound",
474
+ content="Already read",
475
+ platform="meta",
476
+ delivery_status=DeliveryStatus.READ,
477
+ sent_at=sent_time,
478
+ delivered_at=datetime.utcfromtimestamp(now_epoch - 240),
479
+ read_at=datetime.utcfromtimestamp(now_epoch - 180),
480
+ )
481
+ db_session.add(msg)
482
+ await db_session.flush()
483
+
484
+ # Delivery watermark arrives after the message was already READ
485
+ watermark = int(now_epoch * 1000)
486
+ info = {"meta_delivery_watermark": watermark, "meta_read_watermark": None}
487
+
488
+ await _process_meta_status_updates(
489
+ db_session, info, ws.id, str(uuid4())
490
+ )
491
+ await db_session.flush()
492
+
493
+ await db_session.refresh(msg)
494
+ # Should still be READ, not regressed to DELIVERED
495
+ assert msg.delivery_status == DeliveryStatus.READ
496
+
497
+
498
+ # ── Section 5: Lead Ads Ingestion ────────────────────────────────────────
499
+
500
+
501
+ @pytest.mark.asyncio
502
+ async def test_parse_lead_fields_standard():
503
+ """parse_lead_fields normalizes email + phone + full_name."""
504
+ field_data = [
505
+ {"name": "email", "values": ["john@example.com"]},
506
+ {"name": "phone_number", "values": ["+14155551234"]},
507
+ {"name": "full_name", "values": ["John Doe"]},
508
+ ]
509
+ result = parse_lead_fields(field_data)
510
+
511
+ assert result["email"] == "john@example.com"
512
+ assert result["phone"] == "+14155551234"
513
+ assert result["full_name"] == "John Doe"
514
+ assert result["first_name"] == "John"
515
+ assert result["last_name"] == "Doe"
516
+
517
+
518
+ @pytest.mark.asyncio
519
+ async def test_parse_lead_fields_name_split():
520
+ """full_name only (no separate first/last) → correctly splits."""
521
+ field_data = [
522
+ {"name": "full_name", "values": ["Jane Smith-Doe"]},
523
+ ]
524
+ result = parse_lead_fields(field_data)
525
+
526
+ assert result["first_name"] == "Jane"
527
+ assert result["last_name"] == "Smith-Doe"
528
+
529
+
530
+ @pytest.mark.asyncio
531
+ async def test_parse_leadgen_webhook():
532
+ """Leadgen webhook payload → trigger_type=LEAD_AD_SUBMIT, leadgen_id extracted."""
533
+ payload = _leadgen_payload(leadgen_id="LEAD_XYZ")
534
+ info = parse_webhook_payload("meta", payload)
535
+
536
+ assert info["trigger_type"] == "LEAD_AD_SUBMIT"
537
+ assert info["leadgen_id"] == "LEAD_XYZ"
538
+ assert info["channel"] == "lead_ads"
539
+ assert info["provider_user_id"] == "LEAD_XYZ"
540
+
541
+
542
+ # ── Section 6: MetaAdapter ───────────────────────────────────────────────
543
+
544
+
545
+ @pytest.mark.asyncio
546
+ async def test_meta_send_media_success():
547
+ """MetaAdapter.send_media returns provider_message_id on 200."""
548
+ from app.integrations.meta.adapter import MetaAdapter
549
+
550
+ adapter = MetaAdapter(page_id="PAGE_123", page_access_token="token_abc")
551
+ mock_resp = MagicMock(
552
+ status_code=200,
553
+ json=lambda: {"message_id": "mid.media_xyz"},
554
+ )
555
+
556
+ with patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=mock_resp):
557
+ result = await adapter.send_media("PSID_456", "image", "https://example.com/img.jpg")
558
+
559
+ assert result == "mid.media_xyz"
560
+
561
+
562
+ @pytest.mark.asyncio
563
+ async def test_meta_send_media_permanent_failure():
564
+ """MetaAdapter.send_media raises PERMANENT_FAILURE on 400."""
565
+ from app.integrations.meta.adapter import MetaAdapter
566
+
567
+ adapter = MetaAdapter(page_id="PAGE_123", page_access_token="token_abc")
568
+ mock_resp = MagicMock(
569
+ status_code=400,
570
+ json=lambda: {"error": {"code": 100, "message": "Invalid attachment"}},
571
+ )
572
+
573
+ with patch("httpx.AsyncClient.post", new_callable=AsyncMock, return_value=mock_resp):
574
+ with pytest.raises(Exception, match="PERMANENT_FAILURE"):
575
+ await adapter.send_media("PSID_456", "image", "https://example.com/bad.jpg")
576
+
577
+
578
+ @pytest.mark.asyncio
579
+ async def test_meta_get_user_profile():
580
+ """MetaAdapter.get_user_profile returns profile on success, {} on failure."""
581
+ from app.integrations.meta.adapter import MetaAdapter
582
+
583
+ adapter = MetaAdapter(page_id="PAGE_123", page_access_token="token_abc")
584
+
585
+ # Success case
586
+ success_resp = MagicMock(
587
+ status_code=200,
588
+ json=lambda: {"first_name": "John", "last_name": "Doe"},
589
+ )
590
+ with patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=success_resp):
591
+ profile = await adapter.get_user_profile("PSID_456")
592
+ assert profile["first_name"] == "John"
593
+
594
+ # Failure case
595
+ fail_resp = MagicMock(status_code=403, text="Forbidden")
596
+ with patch("httpx.AsyncClient.get", new_callable=AsyncMock, return_value=fail_resp):
597
+ profile = await adapter.get_user_profile("PSID_456")
598
+ assert profile == {}
599
+
600
+
601
+ # ── Section 7: Delivery Status Parsing ───────────────────────────────────
602
+
603
+
604
+ @pytest.mark.asyncio
605
+ async def test_parse_delivery_receipt():
606
+ """Delivery receipt payload → trigger_type=META_STATUS_UPDATE, watermark extracted."""
607
+ watermark = int(datetime.utcnow().timestamp() * 1000)
608
+ payload = _delivery_receipt_payload(watermark=watermark)
609
+ info = parse_webhook_payload("meta", payload)
610
+
611
+ assert info["trigger_type"] == "META_STATUS_UPDATE"
612
+ assert info["meta_delivery_watermark"] == watermark
613
+ assert info["meta_read_watermark"] is None
614
+
615
+
616
+ @pytest.mark.asyncio
617
+ async def test_parse_read_receipt():
618
+ """Read receipt payload → trigger_type=META_STATUS_UPDATE, read watermark extracted."""
619
+ watermark = int(datetime.utcnow().timestamp() * 1000)
620
+ payload = _read_receipt_payload(watermark=watermark)
621
+ info = parse_webhook_payload("meta", payload)
622
+
623
+ assert info["trigger_type"] == "META_STATUS_UPDATE"
624
+ assert info["meta_read_watermark"] == watermark
625
+
626
+
627
+ # ── Section 8: Observability ─────────────────────────────────────────────
628
+
629
+
630
+ @pytest.mark.asyncio
631
+ async def test_meta_metrics_tracking():
632
+ """Meta-specific metrics increment correctly."""
633
+ m = MetricsService()
634
+ m.increment("meta_oauth_started")
635
+ m.increment("meta_oauth_completed")
636
+ m.increment("meta_leads_ingested")
637
+ m.increment("meta_leads_ingested")
638
+ m.increment("webhooks_received", labels={"provider": "meta", "channel": "instagram"})
639
+
640
+ counts = m.get_counts()
641
+ assert counts["meta_oauth_started"] == 1
642
+ assert counts["meta_oauth_completed"] == 1
643
+ assert counts["meta_leads_ingested"] == 2
644
+ assert counts["webhooks_received{channel=instagram,provider=meta}"] == 1