""" Webhook Delivery Queue Daemon thread + queue for non-blocking webhook dispatch with SQLite-backed retry store. Annotation requests are never blocked by webhook delivery. """ import json import logging import os import queue import sqlite3 import threading import time import uuid from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError from .signing import build_headers logger = logging.getLogger(__name__) # Retry schedule in seconds (with jitter added at delivery time) RETRY_SCHEDULE = [0, 5, 30, 120, 600, 3600] MAX_RETRIES = len(RETRY_SCHEDULE) - 1 # Queue size limit — if full, events are dropped (never block annotations) MAX_QUEUE_SIZE = 10000 class WebhookDeliveryQueue: """Background delivery queue with retry support. Uses a daemon thread and stdlib queue.Queue for non-blocking dispatch. Failed deliveries are stored in SQLite for retry. """ def __init__(self, output_dir=None): """Initialize the delivery queue. Args: output_dir: Directory for the retry SQLite database. If None, retries are in-memory only. """ self._queue = queue.Queue(maxsize=MAX_QUEUE_SIZE) self._running = False self._thread = None self._db_path = None self._db_lock = threading.Lock() if output_dir: os.makedirs(output_dir, exist_ok=True) self._db_path = os.path.join(output_dir, "webhook_retries.db") self._init_db() def _init_db(self): """Create the retry store table if it doesn't exist.""" with self._db_lock: conn = sqlite3.connect(self._db_path) try: conn.execute("PRAGMA journal_mode=WAL") conn.execute(""" CREATE TABLE IF NOT EXISTS webhook_retries ( id TEXT PRIMARY KEY, url TEXT NOT NULL, secret TEXT DEFAULT '', payload TEXT NOT NULL, attempt INTEGER DEFAULT 0, next_retry_at REAL NOT NULL, created_at REAL NOT NULL, last_error TEXT DEFAULT '' ) """) conn.commit() finally: conn.close() def start(self): """Start the background delivery thread.""" if self._running: return self._running = True self._thread = threading.Thread( target=self._worker_loop, name="webhook-delivery", daemon=True, ) self._thread.start() logger.debug("Webhook delivery thread started") def stop(self): """Stop the delivery thread gracefully.""" self._running = False # Push sentinel to unblock the queue try: self._queue.put_nowait(None) except queue.Full: pass if self._thread and self._thread.is_alive(): self._thread.join(timeout=5) logger.debug("Webhook delivery thread stopped") def enqueue(self, url, secret, payload_bytes, webhook_id=None): """Add a delivery to the queue (non-blocking). Args: url: Endpoint URL. secret: HMAC secret for signing. payload_bytes: JSON payload as bytes. webhook_id: Optional delivery ID. Returns: True if enqueued, False if queue was full (event dropped). """ delivery = { "id": webhook_id or f"msg_{uuid.uuid4().hex[:24]}", "url": url, "secret": secret, "payload_bytes": payload_bytes, "attempt": 0, } try: self._queue.put_nowait(delivery) return True except queue.Full: logger.warning("Webhook queue full, dropping event for %s", url) return False def _worker_loop(self): """Main loop for the delivery thread.""" while self._running: # Process retries first self._process_retries() # Process new deliveries try: delivery = self._queue.get(timeout=1.0) except queue.Empty: continue if delivery is None: # Sentinel for shutdown break self._deliver(delivery) def _deliver(self, delivery): """Attempt to deliver a webhook. Args: delivery: Dict with id, url, secret, payload_bytes, attempt. """ url = delivery["url"] secret = delivery.get("secret", "") payload_bytes = delivery["payload_bytes"] attempt = delivery.get("attempt", 0) webhook_id = delivery["id"] headers = build_headers(secret, payload_bytes, webhook_id=webhook_id) try: req = Request( url, data=payload_bytes, headers=headers, method="POST", ) resp = urlopen(req, timeout=10) status = resp.getcode() resp.close() if status and 200 <= status < 300: logger.debug("Webhook delivered: %s -> %s (attempt %d)", webhook_id, url, attempt) # Remove from retry store if it was there self._remove_retry(webhook_id) else: self._handle_failure(delivery, f"HTTP {status}") except HTTPError as e: self._handle_failure(delivery, f"HTTP {e.code}: {e.reason}") except (URLError, OSError) as e: self._handle_failure(delivery, str(e)) except Exception as e: self._handle_failure(delivery, str(e)) def _handle_failure(self, delivery, error_msg): """Handle a failed delivery attempt.""" attempt = delivery.get("attempt", 0) url = delivery["url"] webhook_id = delivery["id"] if attempt >= MAX_RETRIES: logger.error("Webhook permanently failed after %d attempts: %s -> %s: %s", attempt + 1, webhook_id, url, error_msg) self._remove_retry(webhook_id) return # Schedule retry with jitter next_attempt = attempt + 1 delay = RETRY_SCHEDULE[min(next_attempt, len(RETRY_SCHEDULE) - 1)] jitter = delay * 0.1 * (hash(webhook_id) % 10) / 10 # Deterministic jitter next_retry_at = time.time() + delay + jitter logger.warning("Webhook delivery failed (attempt %d/%d): %s -> %s: %s. " "Retry in %.0fs", next_attempt, MAX_RETRIES + 1, webhook_id, url, error_msg, delay + jitter) self._store_retry(delivery, next_attempt, next_retry_at, error_msg) def _store_retry(self, delivery, attempt, next_retry_at, error_msg): """Store a failed delivery for retry.""" if not self._db_path: return with self._db_lock: try: conn = sqlite3.connect(self._db_path) conn.execute( """INSERT OR REPLACE INTO webhook_retries (id, url, secret, payload, attempt, next_retry_at, created_at, last_error) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( delivery["id"], delivery["url"], delivery.get("secret", ""), delivery["payload_bytes"].decode("utf-8", errors="replace"), attempt, next_retry_at, time.time(), error_msg, ), ) conn.commit() conn.close() except sqlite3.Error as e: logger.error("Failed to store webhook retry: %s", e) def _remove_retry(self, webhook_id): """Remove a delivery from the retry store.""" if not self._db_path: return with self._db_lock: try: conn = sqlite3.connect(self._db_path) conn.execute("DELETE FROM webhook_retries WHERE id = ?", (webhook_id,)) conn.commit() conn.close() except sqlite3.Error as e: logger.error("Failed to remove webhook retry: %s", e) def _process_retries(self): """Check SQLite for deliveries due for retry.""" if not self._db_path: return now = time.time() with self._db_lock: try: conn = sqlite3.connect(self._db_path) cursor = conn.execute( """SELECT id, url, secret, payload, attempt FROM webhook_retries WHERE next_retry_at <= ? LIMIT 10""", (now,), ) rows = cursor.fetchall() # Delete fetched rows so they're not re-processed for row in rows: conn.execute("DELETE FROM webhook_retries WHERE id = ?", (row[0],)) conn.commit() conn.close() except sqlite3.Error as e: logger.error("Failed to process webhook retries: %s", e) return for row in rows: delivery = { "id": row[0], "url": row[1], "secret": row[2], "payload_bytes": row[3].encode("utf-8"), "attempt": row[4], } self._deliver(delivery) def get_retry_count(self): """Get the number of pending retries (for admin API).""" if not self._db_path: return 0 with self._db_lock: try: conn = sqlite3.connect(self._db_path) cursor = conn.execute("SELECT COUNT(*) FROM webhook_retries") count = cursor.fetchone()[0] conn.close() return count except sqlite3.Error: return 0