Spaces:
Sleeping
Sleeping
File size: 2,345 Bytes
49e9f9d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 | """Fire-and-forget outbound webhook for alert events.
Used to integrate with external automation (n8n, Zapier, Make, custom cron
runners) without hard-coding a specific provider. When `ALERT_WEBHOOK_URL`
is set the backend POSTs a compact JSON payload on every new alert.
This runs in a detached task so request latency is never affected by a
slow/broken webhook. Failures are logged at WARNING level and dropped.
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any
import httpx
from ..core.config import settings
log = logging.getLogger(__name__)
def _webhook_payload(alert: dict[str, Any]) -> dict[str, Any]:
"""Trim the alert doc to what an automation actually needs. Keeps the
payload small (no base64 photos, no internal IDs) so consumers don't
have to learn our full schema."""
return {
"event": "alert.created",
"alert": {
"id": alert.get("id"),
"category": alert.get("category"),
"urgency": alert.get("urgency"),
"description": alert.get("description"),
"status": alert.get("status"),
"address": alert.get("address"),
"location": alert.get("location"),
"photo_count": alert.get("photo_count") or 0,
"verified_score": alert.get("verified_score"),
"created_at": str(alert.get("created_at") or ""),
},
}
async def _post(payload: dict[str, Any]) -> None:
url = settings.ALERT_WEBHOOK_URL
if not url:
return
try:
async with httpx.AsyncClient(
timeout=settings.ALERT_WEBHOOK_TIMEOUT_SECONDS
) as client:
await client.post(url, json=payload)
except Exception as exc: # noqa: BLE001 — log and drop
log.warning("alert webhook failed: %s", exc)
def fire_alert_created(alert: dict[str, Any]) -> None:
"""Schedule a detached webhook POST. Safe to call even if the URL is
unset — short-circuits without scheduling a task."""
if not settings.ALERT_WEBHOOK_URL:
return
payload = _webhook_payload(alert)
try:
asyncio.get_running_loop().create_task(_post(payload))
except RuntimeError:
# No running loop — caller is likely in a sync context; just skip
log.debug("no running loop for webhook dispatch")
|