Spaces:
Paused
Paused
| """ | |
| Webhook Emitter | |
| Singleton that reads endpoint configs, fan-outs events to matching endpoints, | |
| and delegates delivery to WebhookDeliveryQueue. | |
| """ | |
| import json | |
| import logging | |
| from dataclasses import dataclass, field | |
| from typing import Any, Dict, List, Optional | |
| from .sender import WebhookDeliveryQueue | |
| logger = logging.getLogger(__name__) | |
| class WebhookEndpoint: | |
| """Configuration for a single webhook endpoint.""" | |
| name: str | |
| url: str | |
| secret: str = "" | |
| events: List[str] = field(default_factory=list) | |
| active: bool = True | |
| timeout_seconds: int = 10 | |
| class WebhookEmitter: | |
| """Manages webhook endpoint configs and event fan-out. | |
| Reads the 'webhooks' section of the Potato config and dispatches | |
| matching events to registered endpoints via a background delivery queue. | |
| """ | |
| def __init__(self, webhook_config: dict, full_config: dict = None): | |
| """Initialize from the webhooks config section. | |
| Args: | |
| webhook_config: The 'webhooks' section of the YAML config. | |
| full_config: Full Potato config (for output_dir resolution). | |
| """ | |
| self.endpoints: List[WebhookEndpoint] = [] | |
| self._delivery_queue = None | |
| self._stats = {"total_emitted": 0, "total_dropped": 0} | |
| # Parse endpoint configs | |
| for ep_dict in webhook_config.get("endpoints", []): | |
| ep = WebhookEndpoint( | |
| name=ep_dict.get("name", "unnamed"), | |
| url=ep_dict.get("url", ""), | |
| secret=ep_dict.get("secret", ""), | |
| events=ep_dict.get("events", []), | |
| active=ep_dict.get("active", True), | |
| timeout_seconds=ep_dict.get("timeout_seconds", 10), | |
| ) | |
| if ep.url: | |
| self.endpoints.append(ep) | |
| else: | |
| logger.warning("Skipping webhook endpoint '%s' with no URL", | |
| ep.name) | |
| # Resolve output dir for retry store | |
| output_dir = None | |
| if full_config: | |
| task_dir = full_config.get("task_dir", ".") | |
| output_dir_name = full_config.get("output_annotation_dir", | |
| "annotation_output") | |
| import os | |
| output_dir = os.path.join(task_dir, output_dir_name, ".webhooks") | |
| # Start delivery queue | |
| self._delivery_queue = WebhookDeliveryQueue(output_dir=output_dir) | |
| self._delivery_queue.start() | |
| def emit(self, event_type: str, payload: dict) -> int: | |
| """Emit an event to all matching endpoints. | |
| Args: | |
| event_type: Event type string (e.g., "annotation.created"). | |
| payload: Event payload dict (will be JSON-serialized). | |
| Returns: | |
| Number of endpoints the event was dispatched to. | |
| """ | |
| if not self.endpoints: | |
| return 0 | |
| dispatched = 0 | |
| payload_bytes = json.dumps(payload, ensure_ascii=False, | |
| default=str).encode("utf-8") | |
| for ep in self.endpoints: | |
| if not ep.active: | |
| continue | |
| # Match events: wildcard "*" matches all | |
| if "*" not in ep.events and event_type not in ep.events: | |
| continue | |
| success = self._delivery_queue.enqueue( | |
| url=ep.url, | |
| secret=ep.secret, | |
| payload_bytes=payload_bytes, | |
| ) | |
| if success: | |
| dispatched += 1 | |
| self._stats["total_emitted"] += 1 | |
| else: | |
| self._stats["total_dropped"] += 1 | |
| return dispatched | |
| def stop(self): | |
| """Stop the delivery queue.""" | |
| if self._delivery_queue: | |
| self._delivery_queue.stop() | |
| def get_stats(self) -> dict: | |
| """Get delivery statistics (for admin API).""" | |
| retry_count = 0 | |
| if self._delivery_queue: | |
| retry_count = self._delivery_queue.get_retry_count() | |
| return { | |
| "endpoints": len(self.endpoints), | |
| "active_endpoints": sum(1 for ep in self.endpoints if ep.active), | |
| "total_emitted": self._stats["total_emitted"], | |
| "total_dropped": self._stats["total_dropped"], | |
| "pending_retries": retry_count, | |
| } | |
| def get_endpoint_info(self) -> List[dict]: | |
| """Get endpoint info (for admin API, secrets redacted).""" | |
| return [ | |
| { | |
| "name": ep.name, | |
| "url": ep.url, | |
| "events": ep.events, | |
| "active": ep.active, | |
| "timeout_seconds": ep.timeout_seconds, | |
| "has_secret": bool(ep.secret), | |
| } | |
| for ep in self.endpoints | |
| ] | |