LeadPilot / backend /app /workers /tasks.py
Ashraf Al-Kassem
Mission M-D β€” Flow Builder Revamp
68e389f
raw
history blame
32.7 kB
import asyncio
from typing import Optional, Dict
from uuid import UUID
from datetime import datetime
from celery.utils.log import get_task_logger
from app.core.celery_app import celery_app
from app.core.db import engine
from app.models.models import WebhookEventLog, WebhookStatus
from sqlmodel import select
logger = get_task_logger(__name__)
def run_async(coro):
"""Helper to run async code in sync celery worker."""
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro)
from app.models.models import (
Message,
DeliveryStatus,
FlowVersion,
ExecutionInstance,
ExecutionStatus,
Flow
)
from app.domain.contacts import normalize_phone
from app.domain.contacts import resolve_or_create_contact
from app.core.adk.runner import run_for_contact
from app.services.dispatch_service import DispatchService
logger = get_task_logger(__name__)
def parse_webhook_payload(provider: str, payload: Dict) -> Dict:
"""Extract normalized fields from provider payloads."""
normalized = {
"trigger_type": None,
"provider_user_id": None,
"content": None,
"first_name": None,
"media_id": None,
"media_type": None,
"mime_type": None,
"caption": None,
"channel": None, # instagram | messenger | lead_ads
"attachment_url": None,
"leadgen_id": None,
"meta_delivery_watermark": None,
"meta_read_watermark": None,
}
try:
if provider == "whatsapp":
entry = payload.get("entry", [])[0]
value = entry.get("changes", [])[0].get("value", {})
messages = value.get("messages", [])
contacts = value.get("contacts", [])
if messages:
msg = messages[0]
msg_type = msg.get("type", "text")
normalized["trigger_type"] = "MESSAGE_INBOUND"
normalized["provider_user_id"] = msg.get("from")
if contacts:
normalized["first_name"] = contacts[0].get("profile", {}).get("name")
if msg_type == "text":
normalized["content"] = msg.get("text", {}).get("body")
elif msg_type in ("image", "audio", "video", "document"):
media_obj = msg.get(msg_type, {})
normalized["media_id"] = media_obj.get("id")
normalized["media_type"] = msg_type
normalized["mime_type"] = media_obj.get("mime_type")
normalized["caption"] = media_obj.get("caption")
normalized["content"] = media_obj.get("caption") or f"[{msg_type}]"
# WhatsApp status updates (delivery receipts)
statuses = value.get("statuses", [])
if statuses and not messages:
normalized["trigger_type"] = "STATUS_UPDATE"
normalized["statuses"] = statuses
elif provider == "meta":
entry = payload.get("entry", [])[0]
# Channel discrimination: instagram vs messenger
object_type = payload.get("object", "")
if object_type == "instagram":
normalized["channel"] = "instagram"
else:
normalized["channel"] = "messenger"
# Messaging (Messenger/Instagram)
if "messaging" in entry:
msg_event = entry["messaging"][0]
# Delivery receipts (watermark-based)
if "delivery" in msg_event:
normalized["trigger_type"] = "META_STATUS_UPDATE"
normalized["meta_delivery_watermark"] = msg_event["delivery"].get("watermark")
elif "read" in msg_event:
normalized["trigger_type"] = "META_STATUS_UPDATE"
normalized["meta_read_watermark"] = msg_event["read"].get("watermark")
elif msg_event.get("message"):
# Standard inbound message
normalized["trigger_type"] = "MESSAGE_INBOUND"
normalized["provider_user_id"] = msg_event.get("sender", {}).get("id")
message = msg_event.get("message", {})
normalized["content"] = message.get("text")
# Attachment parsing
attachments = message.get("attachments", [])
if attachments:
att = attachments[0]
att_type = att.get("type")
normalized["media_type"] = att_type
normalized["attachment_url"] = att.get("payload", {}).get("url")
if not normalized["content"]:
normalized["content"] = f"[{att_type}]"
# Lead Ads
elif "changes" in entry:
change = entry["changes"][0]
if change.get("field") == "leadgen":
normalized["trigger_type"] = "LEAD_AD_SUBMIT"
normalized["leadgen_id"] = change.get("value", {}).get("leadgen_id")
normalized["provider_user_id"] = normalized["leadgen_id"]
normalized["channel"] = "lead_ads"
normalized["content"] = "Lead Ad Submission"
except (IndexError, KeyError, AttributeError):
pass
return normalized
# Status priority for forward-only progression
_STATUS_PRIORITY = {
DeliveryStatus.PENDING: 0,
DeliveryStatus.SENDING: 1,
DeliveryStatus.SENT: 2,
DeliveryStatus.DELIVERED: 3,
DeliveryStatus.READ: 4,
DeliveryStatus.FAILED: 5,
DeliveryStatus.DEAD_LETTER: 6,
}
async def _process_status_updates(session, statuses: list, workspace_id, correlation_id: str):
"""Process WhatsApp delivery status webhooks (sent/delivered/read/failed)."""
from app.services.runtime_event_service import log_event
status_map = {
"sent": DeliveryStatus.SENT,
"delivered": DeliveryStatus.DELIVERED,
"read": DeliveryStatus.READ,
"failed": DeliveryStatus.FAILED,
}
for s in statuses:
provider_msg_id = s.get("id")
wa_status = s.get("status")
timestamp_str = s.get("timestamp")
if not provider_msg_id or wa_status not in status_map:
continue
result = await session.execute(
select(Message).where(Message.provider_message_id == provider_msg_id)
)
msg = result.scalars().first()
if not msg:
logger.info(f"Status update for unknown message {provider_msg_id}, skipping")
continue
new_status = status_map[wa_status]
old_priority = _STATUS_PRIORITY.get(msg.delivery_status, 0)
new_priority = _STATUS_PRIORITY.get(new_status, 0)
# Only advance forward (don't regress from READ β†’ DELIVERED)
if new_priority <= old_priority:
continue
ts = datetime.utcfromtimestamp(int(timestamp_str)) if timestamp_str else datetime.utcnow()
msg.delivery_status = new_status
if new_status == DeliveryStatus.SENT and not msg.sent_at:
msg.sent_at = ts
elif new_status == DeliveryStatus.DELIVERED:
msg.delivered_at = ts
elif new_status == DeliveryStatus.READ:
msg.read_at = ts
elif new_status == DeliveryStatus.FAILED:
errors = s.get("errors", [])
msg.failure_code = str(errors[0].get("code")) if errors else None
msg.last_error = errors[0].get("title") if errors else "Unknown failure"
session.add(msg)
await log_event(session, event_type="message.status_updated", source="webhook",
workspace_id=workspace_id, correlation_id=correlation_id,
related_ids={"message_id": str(msg.id)},
payload={"status": wa_status, "provider_message_id": provider_msg_id})
async def _process_meta_status_updates(session, info: dict, workspace_id, correlation_id: str):
"""Process Meta delivery/read status webhooks using watermark timestamps."""
from app.services.runtime_event_service import log_event
from sqlmodel import or_
delivery_watermark = info.get("meta_delivery_watermark")
read_watermark = info.get("meta_read_watermark")
if delivery_watermark:
watermark_ts = datetime.utcfromtimestamp(int(delivery_watermark) / 1000)
result = await session.execute(
select(Message).where(
Message.workspace_id == workspace_id,
Message.platform == "meta",
Message.direction == "outbound",
Message.delivery_status == DeliveryStatus.SENT,
Message.sent_at < watermark_ts,
)
)
for msg in result.scalars().all():
old_priority = _STATUS_PRIORITY.get(msg.delivery_status, 0)
new_priority = _STATUS_PRIORITY.get(DeliveryStatus.DELIVERED, 0)
if new_priority > old_priority:
msg.delivery_status = DeliveryStatus.DELIVERED
msg.delivered_at = watermark_ts
session.add(msg)
if read_watermark:
watermark_ts = datetime.utcfromtimestamp(int(read_watermark) / 1000)
result = await session.execute(
select(Message).where(
Message.workspace_id == workspace_id,
Message.platform == "meta",
Message.direction == "outbound",
or_(
Message.delivery_status == DeliveryStatus.SENT,
Message.delivery_status == DeliveryStatus.DELIVERED,
),
Message.sent_at < watermark_ts,
)
)
for msg in result.scalars().all():
old_priority = _STATUS_PRIORITY.get(msg.delivery_status, 0)
new_priority = _STATUS_PRIORITY.get(DeliveryStatus.READ, 0)
if new_priority > old_priority:
msg.delivery_status = DeliveryStatus.READ
msg.read_at = watermark_ts
if not msg.delivered_at:
msg.delivered_at = watermark_ts
session.add(msg)
await log_event(session, event_type="message.meta_status_updated", source="webhook",
workspace_id=workspace_id, correlation_id=correlation_id,
payload={"delivery_watermark": delivery_watermark,
"read_watermark": read_watermark})
@celery_app.task(name="app.workers.tasks.process_webhook_event")
def process_webhook_event(event_id: str):
"""
Process a stored webhook event and trigger automation flows.
"""
async def _run():
logger.info(f"Processing webhook event: {event_id}")
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.runtime_event_service import log_event
async with AsyncSession(engine, expire_on_commit=False) as session:
event = await session.get(WebhookEventLog, UUID(event_id))
if not event or not event.workspace_id:
logger.error(f"Event {event_id} not found or has no workspace")
return
event.status = WebhookStatus.QUEUED
session.add(event)
await log_event(session, event_type="webhook.processing_started", source="webhook",
workspace_id=event.workspace_id, correlation_id=str(event.correlation_id),
related_ids={"webhook_event_id": event_id})
await session.commit()
try:
# 0. Loop Prevention (Part D)
is_echo = False
if event.provider == "meta":
entry = event.payload.get("entry", [{}])[0]
if "messaging" in entry:
msg_event = entry["messaging"][0]
if msg_event.get("message", {}).get("is_echo"):
is_echo = True
if is_echo:
logger.info(f"Ignoring echo event {event_id} to prevent loops")
event.status = WebhookStatus.PROCESSED
session.add(event)
await session.commit()
return
# 1. Parse Payload
info = parse_webhook_payload(event.provider, event.payload)
# 1a. Handle WhatsApp delivery status updates
if info.get("trigger_type") == "STATUS_UPDATE" and info.get("statuses"):
await _process_status_updates(
session, info["statuses"], event.workspace_id,
str(event.correlation_id)
)
event.status = WebhookStatus.PROCESSED
event.processed_at = datetime.utcnow()
session.add(event)
await session.commit()
return
# 1a2. Handle Meta delivery/read status updates (watermark-based)
if info.get("trigger_type") == "META_STATUS_UPDATE":
await _process_meta_status_updates(
session, info, event.workspace_id,
str(event.correlation_id)
)
event.status = WebhookStatus.PROCESSED
event.processed_at = datetime.utcnow()
session.add(event)
await session.commit()
return
if not info["trigger_type"]:
logger.info(f"No actionable trigger found in payload for event {event_id}")
event.status = WebhookStatus.PROCESSED
session.add(event)
await session.commit()
return
# 1b. Normalize phone number for WhatsApp
if event.provider == "whatsapp" and info.get("provider_user_id"):
info["provider_user_id"] = normalize_phone(info["provider_user_id"])
# 2. Resolve Contact & Store Message
contact, identity, conversation = await resolve_or_create_contact(
session,
event.workspace_id,
event.provider,
info["provider_user_id"],
first_name=info["first_name"]
)
if info["trigger_type"] == "MESSAGE_INBOUND":
msg_metadata = {}
if info.get("channel"):
msg_metadata["channel"] = info["channel"]
if info.get("attachment_url"):
msg_metadata["attachment_url"] = info["attachment_url"]
msg_metadata["media_type"] = info.get("media_type")
if info.get("media_id"):
msg_metadata["media_id"] = info["media_id"]
msg_metadata["media_type"] = info["media_type"]
msg_metadata["mime_type"] = info["mime_type"]
# Download and store media (WhatsApp)
try:
from app.services.media_service import download_and_store_media
media_path = await download_and_store_media(
session, event.workspace_id, info["media_id"],
info["mime_type"], event.provider
)
if media_path:
msg_metadata["media_path"] = media_path
except Exception as media_err:
logger.warning(f"Media download failed for {info['media_id']}: {media_err}")
inbound_msg = Message(
workspace_id=event.workspace_id,
conversation_id=conversation.id,
direction="inbound",
content=info["content"] or "",
platform=event.provider,
delivery_status="delivered",
additional_metadata=msg_metadata,
)
session.add(inbound_msg)
# 2b. Lead Ads: fetch lead data and enrich contact
if info["trigger_type"] == "LEAD_AD_SUBMIT" and info.get("leadgen_id"):
from app.services.meta_service import fetch_lead_data, parse_lead_fields
from app.models.models import Integration
from app.core.security import decrypt_data
from app.services.metrics_service import metrics
integration_result = await session.execute(
select(Integration).where(
Integration.workspace_id == event.workspace_id,
Integration.provider == "meta"
)
)
integration = integration_result.scalars().first()
if integration and integration.encrypted_config:
try:
config = decrypt_data(integration.encrypted_config)
lead_raw = await fetch_lead_data(
info["leadgen_id"], config.get("access_token")
)
lead_fields = parse_lead_fields(
lead_raw.get("field_data", [])
)
# Enrich contact metadata
updated_meta = dict(contact.additional_metadata or {})
if lead_fields.get("email"):
updated_meta["email"] = lead_fields["email"]
if lead_fields.get("phone"):
updated_meta["phone"] = lead_fields["phone"]
updated_meta["lead_source"] = "meta_lead_ad"
updated_meta["leadgen_id"] = info["leadgen_id"]
contact.additional_metadata = updated_meta
if lead_fields.get("first_name") and not contact.first_name:
contact.first_name = lead_fields["first_name"]
if lead_fields.get("last_name") and not contact.last_name:
contact.last_name = lead_fields["last_name"]
session.add(contact)
# Store lead as message
lead_msg = Message(
workspace_id=event.workspace_id,
conversation_id=conversation.id,
direction="inbound",
content=f"Lead Ad: {lead_fields.get('full_name', lead_fields.get('first_name', 'Unknown'))}",
platform="meta",
delivery_status="delivered",
additional_metadata={
"type": "lead_ad_submission",
"leadgen_id": info["leadgen_id"],
"lead_fields": lead_fields,
"channel": "lead_ads",
},
)
session.add(lead_msg)
metrics.increment("meta_leads_ingested")
await log_event(
session, event_type="meta.lead_ingested",
source="webhook",
workspace_id=event.workspace_id,
correlation_id=str(event.correlation_id),
payload={"leadgen_id": info["leadgen_id"]},
)
except Exception as lead_err:
logger.warning(f"Lead data fetch failed for {info['leadgen_id']}: {lead_err}")
# 3. Find Matching Published Flow (prefer published_version_id pointer)
flow_version = None
flow_query = select(Flow).where(
Flow.workspace_id == event.workspace_id,
Flow.status == "published",
Flow.published_version_id != None, # noqa: E711
).limit(1)
flow_result = (await session.execute(flow_query)).scalars().first()
if flow_result and flow_result.published_version_id:
flow_version = await session.get(FlowVersion, flow_result.published_version_id)
# Fallback: legacy flows published before Mission 27 (published_version_id not set)
if not flow_version:
legacy_query = select(FlowVersion).join(Flow).where(
Flow.workspace_id == event.workspace_id,
FlowVersion.is_published == True
).order_by(FlowVersion.created_at.desc())
flow_version = (await session.execute(legacy_query)).scalars().first()
if flow_version:
# 4. Create Execution Instance
nodes = flow_version.definition_json.get("nodes", [])
# Use start_node_id if available (Mission 27 format), else find TRIGGER node
start_node_id = flow_version.definition_json.get("start_node_id")
if start_node_id:
start_node = next((n for n in nodes if n.get("id") == start_node_id), None)
else:
start_node = next((n for n in nodes if n.get("type") == "TRIGGER"), None)
if not start_node and nodes:
start_node = nodes[0]
instance = ExecutionInstance(
workspace_id=event.workspace_id,
flow_version_id=flow_version.id,
contact_id=contact.id,
status=ExecutionStatus.RUNNING,
current_node_id=UUID(start_node["id"]) if start_node else None
)
session.add(instance)
await log_event(session, event_type="runtime.execution_created", source="runtime",
workspace_id=event.workspace_id, correlation_id=str(event.correlation_id),
related_ids={"webhook_event_id": event_id, "execution_instance_id": str(instance.id)})
await session.commit()
# 5. Execute Runtime (ADK β€” Mission M-A)
await run_for_contact(
workspace_id=event.workspace_id,
contact_id=contact.id,
conversation_id=conversation.id,
inbound_message=info.get("content") or "",
execution_instance=instance,
session=session,
)
# Mark event success
event.status = WebhookStatus.PROCESSED
event.processed_at = datetime.utcnow()
session.add(event)
await log_event(session, event_type="webhook.processing_completed", source="webhook",
workspace_id=event.workspace_id, correlation_id=str(event.correlation_id),
related_ids={"webhook_event_id": event_id})
await session.commit()
except Exception as e:
logger.exception(f"Failed to process event {event_id}")
event.status = WebhookStatus.FAILED
event.last_error = str(e)
event.attempts += 1
session.add(event)
await log_event(session, event_type="webhook.processing_failed", source="webhook",
workspace_id=event.workspace_id, correlation_id=str(event.correlation_id),
outcome="failure", error_message=str(e),
related_ids={"webhook_event_id": event_id})
await session.commit()
run_async(_run())
@celery_app.task(name="app.workers.tasks.dispatch_message_task")
def dispatch_message_task(message_id: str):
"""Immediate dispatch task for a single outbound message."""
logger.info(f"Celery: Dispatching message {message_id}")
from sqlalchemy.ext.asyncio import AsyncSession
async def _run():
async with AsyncSession(engine, expire_on_commit=False) as session:
msg = await session.get(Message, UUID(message_id))
if msg:
await DispatchService.dispatch_message(session, msg)
else:
logger.error(f"Message {message_id} not found for dispatch")
run_async(_run())
@celery_app.task(name="app.workers.tasks.dispatch_pending_task")
def dispatch_pending_task(workspace_id: Optional[str] = None):
"""Periodic task to poll and dispatch missed outbound intents."""
logger.info(f"Celery: Polling pending messages (Workspace: {workspace_id})")
from sqlalchemy.ext.asyncio import AsyncSession
async def _run():
async with AsyncSession(engine, expire_on_commit=False) as session:
await DispatchService.dispatch_pending_messages(
session,
workspace_id=UUID(workspace_id) if workspace_id else None
)
run_async(_run())
async def _expire_plan_overrides_core(db):
"""Core expiry logic β€” accepts a session for testability."""
from app.models.models import PlanOverride
from app.services.audit_service import audit_event
now = datetime.utcnow()
result = await db.execute(
select(PlanOverride).where(
PlanOverride.status == "active",
PlanOverride.ends_at <= now,
)
)
stale = result.scalars().all()
for override in stale:
override.status = "expired"
await audit_event(
db=db,
action="PLAN_OVERRIDE_EXPIRED",
entity_type="plan_override",
entity_id=str(override.id),
actor_type="system",
workspace_id=override.workspace_id,
outcome="success",
metadata={"ends_at": override.ends_at.isoformat()},
)
if stale:
await db.commit()
logger.info(f"[expire_plan_overrides_task] Expired {len(stale)} plan override(s)")
@celery_app.task(name="app.workers.tasks.expire_plan_overrides_task")
def expire_plan_overrides_task():
"""Every-5-min task: mark ACTIVE plan overrides past their ends_at as EXPIRED and write audit log."""
from sqlalchemy.ext.asyncio import AsyncSession
async def _run():
async with AsyncSession(engine, expire_on_commit=False) as session:
await _expire_plan_overrides_core(session)
run_async(_run())
@celery_app.task(name="app.workers.tasks.purge_runtime_events_task")
def purge_runtime_events_task():
"""Daily task to purge old runtime events based on retention policy."""
from app.core.config import settings as app_settings
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.models import RuntimeEventLog
from datetime import timedelta
from sqlalchemy import delete as sa_delete
async def _run():
cutoff = datetime.utcnow() - timedelta(days=app_settings.RUNTIME_EVENT_RETENTION_DAYS)
total_purged = 0
async with AsyncSession(engine, expire_on_commit=False) as session:
while True:
query = select(RuntimeEventLog.id).where(
RuntimeEventLog.created_at < cutoff
).limit(1000)
result = await session.execute(query)
ids = [row[0] for row in result.all()]
if not ids:
break
await session.execute(
sa_delete(RuntimeEventLog).where(RuntimeEventLog.id.in_(ids))
)
await session.commit()
total_purged += len(ids)
logger.info(f"Purged {total_purged} runtime events older than {app_settings.RUNTIME_EVENT_RETENTION_DAYS} days")
run_async(_run())
@celery_app.task(name="app.workers.tasks.export_to_hf_task")
def export_to_hf_task():
"""Daily task: export all LeadPilot data (static + dynamic) to HuggingFace dataset repo.
Silently no-ops if HF_TOKEN or HF_DATASET_REPO is not configured."""
import sys as _sys
import os as _os
from app.core.config import settings as app_settings
if not app_settings.HF_TOKEN or not app_settings.HF_DATASET_REPO:
logger.warning(
"[export_to_hf_task] HF_TOKEN or HF_DATASET_REPO not configured β€” skipping export."
)
return
async def _run():
# Locate scripts/ directory relative to this file: app/workers/tasks.py β†’ backend/scripts/
scripts_dir = _os.path.join(
_os.path.dirname( # backend/
_os.path.dirname( # app/
_os.path.dirname(_os.path.abspath(__file__)) # app/workers/
)
),
"scripts",
)
if scripts_dir not in _sys.path:
_sys.path.insert(0, scripts_dir)
from export_to_hf import run_export
try:
await run_export(
mode="all",
batch_size=5000,
dry_run=False,
hf_token=app_settings.HF_TOKEN,
repo_id=app_settings.HF_DATASET_REPO,
)
logger.info("[export_to_hf_task] Export completed successfully.")
except Exception:
logger.exception(
"[export_to_hf_task] Export failed β€” will retry on next scheduled run."
)
run_async(_run())
@celery_app.task(name="app.workers.tasks.resume_waiting_instances_task")
def resume_waiting_instances_task():
"""
Every-minute task: find WAITING ExecutionInstances whose delay has elapsed
and re-trigger the ADK runner for them (Mission M-D: WAIT_DELAY support).
"""
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.models import Conversation
async def _run():
now = datetime.utcnow()
async with AsyncSession(engine, expire_on_commit=False) as session:
result = await session.execute(
select(ExecutionInstance).where(
ExecutionInstance.status == ExecutionStatus.WAITING,
ExecutionInstance.resume_at <= now,
)
)
instances = result.scalars().all()
if not instances:
return
for instance in instances:
try:
instance.status = ExecutionStatus.RUNNING
session.add(instance)
await session.flush()
# Resolve conversation_id for this contact in this workspace
conv_result = await session.execute(
select(Conversation).where(
Conversation.workspace_id == instance.workspace_id,
Conversation.contact_id == instance.contact_id,
).limit(1)
)
conversation = conv_result.scalars().first()
if not conversation:
logger.warning(
f"[resume_waiting_instances_task] No conversation for instance {instance.id}"
)
continue
await run_for_contact(
workspace_id=instance.workspace_id,
contact_id=instance.contact_id,
conversation_id=conversation.id,
inbound_message="[Resumed after delay]",
execution_instance=instance,
session=session,
)
except Exception:
logger.exception(
f"[resume_waiting_instances_task] Failed to resume instance {instance.id}"
)
await session.commit()
logger.info(f"[resume_waiting_instances_task] Resumed {len(instances)} waiting instance(s)")
run_async(_run())