| """ |
| Response Worker - Consumes partner response events and processes accept/decline responses. |
| |
| This worker: |
| 1. Consumes events from partner_response_queue |
| 2. Validates event structure |
| 3. Processes partner accept responses (assign booking) |
| 4. Processes partner decline responses (trigger retry) |
| 5. Handles errors with retry logic and DLQ |
| 6. Implements graceful shutdown |
| """ |
| import asyncio |
| import signal |
| from datetime import datetime |
| from typing import Set |
| from uuid import UUID |
|
|
| from app.core.logging import get_logger |
| from app.models.offer import PartnerResponseEvent |
| from app.queue.queue_manager import RedisQueueManager |
| from app.services.notification_service import NotificationService |
| from app.services.offer_service import OfferService |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| class ResponseWorker: |
| """ |
| Background worker that processes partner response events. |
| |
| Responsibilities: |
| - Consume events from partner_response_queue |
| - Validate event structure (offer_id, partner_id, response) |
| - Process accept responses (update offer and booking status) |
| - Process decline responses (update offer status and trigger retry) |
| - Handle errors with retry and DLQ |
| - Graceful shutdown with in-flight task completion |
| """ |
| |
| def __init__( |
| self, |
| queue_manager: RedisQueueManager, |
| offer_service: OfferService, |
| consumer_name: str, |
| max_retries: int = 3 |
| ): |
| """ |
| Initialize response worker. |
| |
| Args: |
| queue_manager: Redis queue manager for event consumption |
| offer_service: Service for offer response processing |
| consumer_name: Unique name for this consumer instance |
| max_retries: Maximum retry attempts for transient errors |
| """ |
| self.queue_manager = queue_manager |
| self.offer_service = offer_service |
| self.consumer_name = consumer_name |
| self.max_retries = max_retries |
| self.notification_service = NotificationService() |
|
|
| |
| self.shutdown_event = asyncio.Event() |
| self.in_flight_tasks: Set[asyncio.Task] = set() |
| |
| |
| self.queue_name = "partner_response_queue" |
| self.consumer_group = "response-workers" |
| |
| logger.info( |
| "Initialized ResponseWorker", |
| extra={ |
| "consumer_name": consumer_name, |
| "queue_name": self.queue_name, |
| "consumer_group": self.consumer_group, |
| "max_retries": max_retries |
| } |
| ) |
| |
| def setup_signal_handlers(self) -> None: |
| """ |
| Register signal handlers for graceful shutdown. |
| |
| Handles SIGTERM and SIGINT signals. |
| """ |
| def signal_handler(signum, frame): |
| signal_name = signal.Signals(signum).name |
| logger.info( |
| f"Received signal {signal_name}, initiating graceful shutdown", |
| extra={"signal": signal_name, "consumer_name": self.consumer_name} |
| ) |
| self.shutdown_event.set() |
| |
| signal.signal(signal.SIGTERM, signal_handler) |
| signal.signal(signal.SIGINT, signal_handler) |
| |
| logger.info("Signal handlers registered for graceful shutdown") |
| |
| async def consume_response_events(self) -> None: |
| """ |
| Main event loop - consumes and processes partner response events. |
| |
| Implements: |
| - Consumer group consumption from partner_response_queue |
| - Graceful shutdown with in-flight task completion |
| - Error handling and logging |
| """ |
| logger.info( |
| "Starting partner response event consumption", |
| extra={ |
| "queue_name": self.queue_name, |
| "consumer_group": self.consumer_group, |
| "consumer_name": self.consumer_name |
| } |
| ) |
| |
| |
| await self.queue_manager.create_consumer_group( |
| self.queue_name, |
| self.consumer_group |
| ) |
| |
| _loop_count = 0 |
| try: |
| |
| while not self.shutdown_event.is_set(): |
| messages = await self.queue_manager.consume_events( |
| self.queue_name, |
| self.consumer_group, |
| self.consumer_name, |
| count=1, |
| block=5000, |
| ) |
|
|
| |
| |
| _loop_count += 1 |
| if _loop_count % 30 == 0: |
| await self._reclaim_stale_messages() |
|
|
| if not messages: |
| continue |
|
|
| for _stream_name, message_list in messages: |
| for message_id, data in message_list: |
| if self.shutdown_event.is_set(): |
| logger.info("Shutdown signal detected, stopping event consumption") |
| return |
|
|
| task = asyncio.create_task( |
| self._process_event_with_tracking(message_id, data) |
| ) |
| self.in_flight_tasks.add(task) |
| task.add_done_callback(self.in_flight_tasks.discard) |
|
|
| except asyncio.CancelledError: |
| logger.info("Event consumption cancelled") |
| except Exception as e: |
| logger.error( |
| "Fatal error in event consumption loop", |
| exc_info=e, |
| extra={"consumer_name": self.consumer_name} |
| ) |
| finally: |
| |
| if self.in_flight_tasks: |
| logger.info( |
| f"Waiting for {len(self.in_flight_tasks)} in-flight tasks to complete", |
| extra={"in_flight_count": len(self.in_flight_tasks)} |
| ) |
| await asyncio.gather(*self.in_flight_tasks, return_exceptions=True) |
| |
| logger.info("Response worker shutdown complete") |
| |
| async def _process_event_with_tracking(self, message_id: str, data: dict) -> None: |
| """ |
| Process event with retry logic and error handling. |
| |
| Args: |
| message_id: Redis stream message ID (used for XACK) |
| data: Event field dict from xreadgroup |
| """ |
| retry_count = 0 |
| base_delay = 1.0 |
|
|
| for attempt in range(self.max_retries): |
| try: |
| |
| await self.process_response_event(data) |
|
|
| |
| await self.queue_manager.acknowledge_event( |
| self.queue_name, |
| self.consumer_group, |
| message_id |
| ) |
|
|
| logger.debug( |
| "Event processed and acknowledged successfully", |
| extra={"message_id": message_id, "attempt": attempt + 1} |
| ) |
| return |
|
|
| except ValueError as e: |
| |
| logger.error( |
| "Invalid event structure, discarding", |
| extra={ |
| "message_id": message_id, |
| "error": str(e), |
| "event_data": data |
| } |
| ) |
|
|
| |
| await self.queue_manager.move_to_dead_letter_queue( |
| data, e, self.queue_name, retry_count=attempt |
| ) |
| await self.queue_manager.acknowledge_event( |
| self.queue_name, |
| self.consumer_group, |
| message_id |
| ) |
| return |
|
|
| except Exception as e: |
| retry_count = attempt + 1 |
|
|
| if attempt < self.max_retries - 1: |
| |
| delay = base_delay * (2 ** attempt) |
| logger.warning( |
| f"Event processing failed, retrying in {delay}s", |
| extra={ |
| "message_id": message_id, |
| "attempt": retry_count, |
| "max_retries": self.max_retries, |
| "error": str(e), |
| "error_type": type(e).__name__ |
| } |
| ) |
| await asyncio.sleep(delay) |
| else: |
| |
| logger.error( |
| f"Event processing failed after {self.max_retries} attempts", |
| exc_info=e, |
| extra={ |
| "message_id": message_id, |
| "error": str(e), |
| "error_type": type(e).__name__ |
| } |
| ) |
|
|
| await self.queue_manager.move_to_dead_letter_queue( |
| data, e, self.queue_name, retry_count=retry_count |
| ) |
| await self.queue_manager.acknowledge_event( |
| self.queue_name, |
| self.consumer_group, |
| message_id |
| ) |
| return |
| |
| async def process_response_event(self, data: dict) -> None: |
| """ |
| Process a single partner response event. |
| |
| Steps: |
| 1. Validate event structure (offer_id, partner_id, response) |
| 2. If response='accept', call offer_service.accept_offer |
| 3. If response='decline', call offer_service.decline_offer and trigger retry |
| 4. Log response processing details |
| |
| Args: |
| data: Event field dict from xreadgroup |
| |
| Raises: |
| ValueError: If event structure is invalid |
| Exception: For transient errors (database, Redis, etc.) |
| """ |
| |
| try: |
| response_event = self._validate_event(data) |
| except ValueError as e: |
| logger.error( |
| "Event validation failed", |
| extra={ |
| "error": str(e), |
| "event_data": data |
| } |
| ) |
| raise |
| |
| offer_id = response_event.offer_id |
| booking_id = response_event.booking_id |
| partner_id = response_event.partner_id |
| response = response_event.response |
| response_time = response_event.response_time |
| |
| logger.info( |
| "Processing partner response event", |
| extra={ |
| "offer_id": str(offer_id), |
| "booking_id": str(booking_id), |
| "partner_id": str(partner_id), |
| "response": response, |
| "response_time": response_time.isoformat() |
| } |
| ) |
| |
| |
| if response == "accept": |
| |
| success = await self.offer_service.accept_offer(offer_id) |
| |
| if success: |
| logger.info( |
| "Partner accepted offer, booking assigned", |
| extra={ |
| "offer_id": str(offer_id), |
| "booking_id": str(booking_id), |
| "partner_id": str(partner_id), |
| "response_timestamp": datetime.utcnow().isoformat() |
| } |
| ) |
| await self.notification_service.send_customer_booking_assigned( |
| booking_id=booking_id, |
| partner_id=partner_id, |
| ) |
| await self.notification_service.send_acceptance_confirmation( |
| partner_id=partner_id, |
| offer_id=offer_id, |
| booking_id=booking_id, |
| ) |
| else: |
| logger.warning( |
| "Accept response processing failed (offer may already be responded)", |
| extra={ |
| "offer_id": str(offer_id), |
| "booking_id": str(booking_id), |
| "partner_id": str(partner_id) |
| } |
| ) |
| |
| elif response == "decline": |
| |
| success = await self.offer_service.decline_offer(offer_id) |
| |
| if success: |
| logger.info( |
| "Partner declined offer, triggering allocation retry", |
| extra={ |
| "offer_id": str(offer_id), |
| "booking_id": str(booking_id), |
| "partner_id": str(partner_id), |
| "response_timestamp": datetime.utcnow().isoformat() |
| } |
| ) |
| await self.notification_service.send_withdrawal_notification( |
| partner_id=partner_id, |
| offer_id=offer_id, |
| booking_id=booking_id, |
| ) |
| |
| retry_event = { |
| "booking_id": str(booking_id), |
| "retry_reason": "partner_declined", |
| "previous_partner_id": str(partner_id), |
| "previous_offer_id": str(offer_id) |
| } |
| |
| await self.queue_manager.publish_event( |
| "booking_allocation_queue", |
| retry_event |
| ) |
| |
| logger.info( |
| "Allocation retry event published", |
| extra={ |
| "booking_id": str(booking_id), |
| "retry_reason": "partner_declined", |
| "previous_partner_id": str(partner_id), |
| "next_partner_id": "to_be_determined" |
| } |
| ) |
| else: |
| logger.warning( |
| "Decline response processing failed (offer may already be responded)", |
| extra={ |
| "offer_id": str(offer_id), |
| "booking_id": str(booking_id), |
| "partner_id": str(partner_id) |
| } |
| ) |
| else: |
| |
| raise ValueError(f"Invalid response value: {response}. Must be 'accept' or 'decline'") |
| |
| def _validate_event(self, event: dict) -> PartnerResponseEvent: |
| """ |
| Validate event structure and parse into PartnerResponseEvent model. |
| |
| Args: |
| event: Raw event dictionary from queue |
| |
| Returns: |
| Validated PartnerResponseEvent object |
| |
| Raises: |
| ValueError: If event structure is invalid or missing required fields |
| """ |
| required_fields = [ |
| "offer_id", |
| "booking_id", |
| "partner_id", |
| "response", |
| "response_time" |
| ] |
| |
| |
| missing_fields = [field for field in required_fields if field not in event] |
| if missing_fields: |
| raise ValueError( |
| f"Event missing required fields: {', '.join(missing_fields)}" |
| ) |
| |
| |
| if event["response"] not in ["accept", "decline"]: |
| raise ValueError( |
| f"Invalid response value: {event['response']}. Must be 'accept' or 'decline'" |
| ) |
| |
| |
| try: |
| response_event = PartnerResponseEvent( |
| offer_id=UUID(event["offer_id"]), |
| booking_id=UUID(event["booking_id"]), |
| partner_id=UUID(event["partner_id"]), |
| response=event["response"], |
| response_time=datetime.fromisoformat(event["response_time"]) |
| ) |
| return response_event |
| except (ValueError, TypeError, KeyError) as e: |
| raise ValueError(f"Event validation failed: {str(e)}") |
| |
| async def _reclaim_stale_messages(self) -> None: |
| """ |
| Gap 4 fix: Reclaim PEL messages idle > 5min from a previously crashed instance. |
| |
| Must be called from within this worker — reclaiming to self.consumer_name ensures |
| messages are assigned to a consumer that actually reads from this queue. |
| The scheduler must NOT do this on our behalf. |
| """ |
| try: |
| claimed, _, _ = await self.queue_manager.redis.xautoclaim( |
| self.queue_name, |
| self.consumer_group, |
| self.consumer_name, |
| min_idle_time=60_000, |
| start_id="0-0", |
| count=10, |
| ) |
| if claimed: |
| logger.info( |
| "Reclaimed stale PEL messages", |
| extra={ |
| "queue": self.queue_name, |
| "count": len(claimed), |
| "consumer": self.consumer_name, |
| }, |
| ) |
| except Exception as e: |
| |
| logger.debug("PEL reclaim skipped", extra={"queue": self.queue_name, "error": str(e)}) |
|
|
| async def run(self) -> None: |
| """ |
| Run the response worker. |
| |
| Sets up signal handlers, reclaims any stale PEL messages from a |
| previously crashed instance, then starts event consumption. |
| """ |
| logger.info( |
| "Starting ResponseWorker", |
| extra={"consumer_name": self.consumer_name} |
| ) |
|
|
| |
| self.setup_signal_handlers() |
|
|
| |
| await self._reclaim_stale_messages() |
|
|
| |
| await self.consume_response_events() |
|
|
| logger.info( |
| "ResponseWorker stopped", |
| extra={"consumer_name": self.consumer_name} |
| ) |
|
|
|
|
| async def main(): |
| """ |
| Main entry point for response worker. |
| |
| Initializes all dependencies and starts the worker. |
| """ |
| from app.core.config import get_settings |
| from app.core.logging import setup_logging |
| from app.db.postgres import create_pg_engine |
| from app.queue.redis_client import create_redis_client |
|
|
| settings = get_settings() |
| setup_logging(settings.LOG_LEVEL) |
| |
| logger.info("Initializing Response Worker") |
| |
| |
| engine = await create_pg_engine() |
| |
| |
| redis_client = await create_redis_client() |
| |
| |
| queue_manager = RedisQueueManager(redis_client) |
| |
| |
| offer_service = OfferService(engine, queue_manager) |
| |
| |
| import os |
| import socket |
| consumer_name = os.getenv("HOSTNAME", socket.gethostname()) |
| |
| |
| worker = ResponseWorker( |
| queue_manager=queue_manager, |
| offer_service=offer_service, |
| consumer_name=consumer_name |
| ) |
| |
| |
| try: |
| await worker.run() |
| finally: |
| |
| await queue_manager.close() |
| await engine.dispose() |
| logger.info("Response Worker shutdown complete") |
|
|
|
|
| if __name__ == "__main__": |
| asyncio.run(main()) |
|
|