codebook / potato /webhooks /sender.py
davidjurgens's picture
Deploy: Potato — Codebook Annotation
aceb1b2 verified
Raw
History Blame Contribute Delete
10.3 kB
"""
Webhook Delivery Queue
Daemon thread + queue for non-blocking webhook dispatch with SQLite-backed
retry store. Annotation requests are never blocked by webhook delivery.
"""
import json
import logging
import os
import queue
import sqlite3
import threading
import time
import uuid
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
from .signing import build_headers
logger = logging.getLogger(__name__)
# Retry schedule in seconds (with jitter added at delivery time)
RETRY_SCHEDULE = [0, 5, 30, 120, 600, 3600]
MAX_RETRIES = len(RETRY_SCHEDULE) - 1
# Queue size limit — if full, events are dropped (never block annotations)
MAX_QUEUE_SIZE = 10000
class WebhookDeliveryQueue:
"""Background delivery queue with retry support.
Uses a daemon thread and stdlib queue.Queue for non-blocking dispatch.
Failed deliveries are stored in SQLite for retry.
"""
def __init__(self, output_dir=None):
"""Initialize the delivery queue.
Args:
output_dir: Directory for the retry SQLite database.
If None, retries are in-memory only.
"""
self._queue = queue.Queue(maxsize=MAX_QUEUE_SIZE)
self._running = False
self._thread = None
self._db_path = None
self._db_lock = threading.Lock()
if output_dir:
os.makedirs(output_dir, exist_ok=True)
self._db_path = os.path.join(output_dir, "webhook_retries.db")
self._init_db()
def _init_db(self):
"""Create the retry store table if it doesn't exist."""
with self._db_lock:
conn = sqlite3.connect(self._db_path)
try:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("""
CREATE TABLE IF NOT EXISTS webhook_retries (
id TEXT PRIMARY KEY,
url TEXT NOT NULL,
secret TEXT DEFAULT '',
payload TEXT NOT NULL,
attempt INTEGER DEFAULT 0,
next_retry_at REAL NOT NULL,
created_at REAL NOT NULL,
last_error TEXT DEFAULT ''
)
""")
conn.commit()
finally:
conn.close()
def start(self):
"""Start the background delivery thread."""
if self._running:
return
self._running = True
self._thread = threading.Thread(
target=self._worker_loop,
name="webhook-delivery",
daemon=True,
)
self._thread.start()
logger.debug("Webhook delivery thread started")
def stop(self):
"""Stop the delivery thread gracefully."""
self._running = False
# Push sentinel to unblock the queue
try:
self._queue.put_nowait(None)
except queue.Full:
pass
if self._thread and self._thread.is_alive():
self._thread.join(timeout=5)
logger.debug("Webhook delivery thread stopped")
def enqueue(self, url, secret, payload_bytes, webhook_id=None):
"""Add a delivery to the queue (non-blocking).
Args:
url: Endpoint URL.
secret: HMAC secret for signing.
payload_bytes: JSON payload as bytes.
webhook_id: Optional delivery ID.
Returns:
True if enqueued, False if queue was full (event dropped).
"""
delivery = {
"id": webhook_id or f"msg_{uuid.uuid4().hex[:24]}",
"url": url,
"secret": secret,
"payload_bytes": payload_bytes,
"attempt": 0,
}
try:
self._queue.put_nowait(delivery)
return True
except queue.Full:
logger.warning("Webhook queue full, dropping event for %s", url)
return False
def _worker_loop(self):
"""Main loop for the delivery thread."""
while self._running:
# Process retries first
self._process_retries()
# Process new deliveries
try:
delivery = self._queue.get(timeout=1.0)
except queue.Empty:
continue
if delivery is None: # Sentinel for shutdown
break
self._deliver(delivery)
def _deliver(self, delivery):
"""Attempt to deliver a webhook.
Args:
delivery: Dict with id, url, secret, payload_bytes, attempt.
"""
url = delivery["url"]
secret = delivery.get("secret", "")
payload_bytes = delivery["payload_bytes"]
attempt = delivery.get("attempt", 0)
webhook_id = delivery["id"]
headers = build_headers(secret, payload_bytes, webhook_id=webhook_id)
try:
req = Request(
url,
data=payload_bytes,
headers=headers,
method="POST",
)
resp = urlopen(req, timeout=10)
status = resp.getcode()
resp.close()
if status and 200 <= status < 300:
logger.debug("Webhook delivered: %s -> %s (attempt %d)",
webhook_id, url, attempt)
# Remove from retry store if it was there
self._remove_retry(webhook_id)
else:
self._handle_failure(delivery, f"HTTP {status}")
except HTTPError as e:
self._handle_failure(delivery, f"HTTP {e.code}: {e.reason}")
except (URLError, OSError) as e:
self._handle_failure(delivery, str(e))
except Exception as e:
self._handle_failure(delivery, str(e))
def _handle_failure(self, delivery, error_msg):
"""Handle a failed delivery attempt."""
attempt = delivery.get("attempt", 0)
url = delivery["url"]
webhook_id = delivery["id"]
if attempt >= MAX_RETRIES:
logger.error("Webhook permanently failed after %d attempts: %s -> %s: %s",
attempt + 1, webhook_id, url, error_msg)
self._remove_retry(webhook_id)
return
# Schedule retry with jitter
next_attempt = attempt + 1
delay = RETRY_SCHEDULE[min(next_attempt, len(RETRY_SCHEDULE) - 1)]
jitter = delay * 0.1 * (hash(webhook_id) % 10) / 10 # Deterministic jitter
next_retry_at = time.time() + delay + jitter
logger.warning("Webhook delivery failed (attempt %d/%d): %s -> %s: %s. "
"Retry in %.0fs",
next_attempt, MAX_RETRIES + 1, webhook_id, url,
error_msg, delay + jitter)
self._store_retry(delivery, next_attempt, next_retry_at, error_msg)
def _store_retry(self, delivery, attempt, next_retry_at, error_msg):
"""Store a failed delivery for retry."""
if not self._db_path:
return
with self._db_lock:
try:
conn = sqlite3.connect(self._db_path)
conn.execute(
"""INSERT OR REPLACE INTO webhook_retries
(id, url, secret, payload, attempt, next_retry_at,
created_at, last_error)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
delivery["id"],
delivery["url"],
delivery.get("secret", ""),
delivery["payload_bytes"].decode("utf-8", errors="replace"),
attempt,
next_retry_at,
time.time(),
error_msg,
),
)
conn.commit()
conn.close()
except sqlite3.Error as e:
logger.error("Failed to store webhook retry: %s", e)
def _remove_retry(self, webhook_id):
"""Remove a delivery from the retry store."""
if not self._db_path:
return
with self._db_lock:
try:
conn = sqlite3.connect(self._db_path)
conn.execute("DELETE FROM webhook_retries WHERE id = ?",
(webhook_id,))
conn.commit()
conn.close()
except sqlite3.Error as e:
logger.error("Failed to remove webhook retry: %s", e)
def _process_retries(self):
"""Check SQLite for deliveries due for retry."""
if not self._db_path:
return
now = time.time()
with self._db_lock:
try:
conn = sqlite3.connect(self._db_path)
cursor = conn.execute(
"""SELECT id, url, secret, payload, attempt
FROM webhook_retries
WHERE next_retry_at <= ?
LIMIT 10""",
(now,),
)
rows = cursor.fetchall()
# Delete fetched rows so they're not re-processed
for row in rows:
conn.execute("DELETE FROM webhook_retries WHERE id = ?",
(row[0],))
conn.commit()
conn.close()
except sqlite3.Error as e:
logger.error("Failed to process webhook retries: %s", e)
return
for row in rows:
delivery = {
"id": row[0],
"url": row[1],
"secret": row[2],
"payload_bytes": row[3].encode("utf-8"),
"attempt": row[4],
}
self._deliver(delivery)
def get_retry_count(self):
"""Get the number of pending retries (for admin API)."""
if not self._db_path:
return 0
with self._db_lock:
try:
conn = sqlite3.connect(self._db_path)
cursor = conn.execute("SELECT COUNT(*) FROM webhook_retries")
count = cursor.fetchone()[0]
conn.close()
return count
except sqlite3.Error:
return 0