""" Response Worker - Consumes partner response events and processes accept/decline responses. This worker: 1. Consumes events from partner_response_queue 2. Validates event structure 3. Processes partner accept responses (assign booking) 4. Processes partner decline responses (trigger retry) 5. Handles errors with retry logic and DLQ 6. Implements graceful shutdown """ import asyncio import signal from datetime import datetime from typing import Set from uuid import UUID from app.core.logging import get_logger from app.models.offer import PartnerResponseEvent from app.queue.queue_manager import RedisQueueManager from app.services.notification_service import NotificationService from app.services.offer_service import OfferService logger = get_logger(__name__) class ResponseWorker: """ Background worker that processes partner response events. Responsibilities: - Consume events from partner_response_queue - Validate event structure (offer_id, partner_id, response) - Process accept responses (update offer and booking status) - Process decline responses (update offer status and trigger retry) - Handle errors with retry and DLQ - Graceful shutdown with in-flight task completion """ def __init__( self, queue_manager: RedisQueueManager, offer_service: OfferService, consumer_name: str, max_retries: int = 3 ): """ Initialize response worker. Args: queue_manager: Redis queue manager for event consumption offer_service: Service for offer response processing consumer_name: Unique name for this consumer instance max_retries: Maximum retry attempts for transient errors """ self.queue_manager = queue_manager self.offer_service = offer_service self.consumer_name = consumer_name self.max_retries = max_retries self.notification_service = NotificationService() # Graceful shutdown management self.shutdown_event = asyncio.Event() self.in_flight_tasks: Set[asyncio.Task] = set() # Queue configuration self.queue_name = "partner_response_queue" self.consumer_group = "response-workers" logger.info( "Initialized ResponseWorker", extra={ "consumer_name": consumer_name, "queue_name": self.queue_name, "consumer_group": self.consumer_group, "max_retries": max_retries } ) def setup_signal_handlers(self) -> None: """ Register signal handlers for graceful shutdown. Handles SIGTERM and SIGINT signals. """ def signal_handler(signum, frame): signal_name = signal.Signals(signum).name logger.info( f"Received signal {signal_name}, initiating graceful shutdown", extra={"signal": signal_name, "consumer_name": self.consumer_name} ) self.shutdown_event.set() signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) logger.info("Signal handlers registered for graceful shutdown") async def consume_response_events(self) -> None: """ Main event loop - consumes and processes partner response events. Implements: - Consumer group consumption from partner_response_queue - Graceful shutdown with in-flight task completion - Error handling and logging """ logger.info( "Starting partner response event consumption", extra={ "queue_name": self.queue_name, "consumer_group": self.consumer_group, "consumer_name": self.consumer_name } ) # Ensure consumer group exists await self.queue_manager.create_consumer_group( self.queue_name, self.consumer_group ) _loop_count = 0 try: # Consume events until shutdown signal while not self.shutdown_event.is_set(): messages = await self.queue_manager.consume_events( self.queue_name, self.consumer_group, self.consumer_name, count=1, block=5000, ) # N1 fix: reclaim stale PEL messages periodically inside the loop, # not just once at startup. Matches the pattern in allocation_worker. _loop_count += 1 if _loop_count % 30 == 0: await self._reclaim_stale_messages() if not messages: continue for _stream_name, message_list in messages: for message_id, data in message_list: if self.shutdown_event.is_set(): logger.info("Shutdown signal detected, stopping event consumption") return task = asyncio.create_task( self._process_event_with_tracking(message_id, data) ) self.in_flight_tasks.add(task) task.add_done_callback(self.in_flight_tasks.discard) except asyncio.CancelledError: logger.info("Event consumption cancelled") except Exception as e: logger.error( "Fatal error in event consumption loop", exc_info=e, extra={"consumer_name": self.consumer_name} ) finally: # Wait for in-flight tasks to complete if self.in_flight_tasks: logger.info( f"Waiting for {len(self.in_flight_tasks)} in-flight tasks to complete", extra={"in_flight_count": len(self.in_flight_tasks)} ) await asyncio.gather(*self.in_flight_tasks, return_exceptions=True) logger.info("Response worker shutdown complete") async def _process_event_with_tracking(self, message_id: str, data: dict) -> None: """ Process event with retry logic and error handling. Args: message_id: Redis stream message ID (used for XACK) data: Event field dict from xreadgroup """ retry_count = 0 base_delay = 1.0 for attempt in range(self.max_retries): try: # Process the response event await self.process_response_event(data) # Acknowledge successful processing await self.queue_manager.acknowledge_event( self.queue_name, self.consumer_group, message_id ) logger.debug( "Event processed and acknowledged successfully", extra={"message_id": message_id, "attempt": attempt + 1} ) return except ValueError as e: # Permanent error - invalid event structure logger.error( "Invalid event structure, discarding", extra={ "message_id": message_id, "error": str(e), "event_data": data } ) # Move to DLQ and acknowledge await self.queue_manager.move_to_dead_letter_queue( data, e, self.queue_name, retry_count=attempt ) await self.queue_manager.acknowledge_event( self.queue_name, self.consumer_group, message_id ) return except Exception as e: retry_count = attempt + 1 if attempt < self.max_retries - 1: # Transient error - retry with exponential backoff delay = base_delay * (2 ** attempt) logger.warning( f"Event processing failed, retrying in {delay}s", extra={ "message_id": message_id, "attempt": retry_count, "max_retries": self.max_retries, "error": str(e), "error_type": type(e).__name__ } ) await asyncio.sleep(delay) else: # Max retries exceeded - move to DLQ logger.error( f"Event processing failed after {self.max_retries} attempts", exc_info=e, extra={ "message_id": message_id, "error": str(e), "error_type": type(e).__name__ } ) await self.queue_manager.move_to_dead_letter_queue( data, e, self.queue_name, retry_count=retry_count ) await self.queue_manager.acknowledge_event( self.queue_name, self.consumer_group, message_id ) return async def process_response_event(self, data: dict) -> None: """ Process a single partner response event. Steps: 1. Validate event structure (offer_id, partner_id, response) 2. If response='accept', call offer_service.accept_offer 3. If response='decline', call offer_service.decline_offer and trigger retry 4. Log response processing details Args: data: Event field dict from xreadgroup Raises: ValueError: If event structure is invalid Exception: For transient errors (database, Redis, etc.) """ # Step 1: Validate and parse event try: response_event = self._validate_event(data) except ValueError as e: logger.error( "Event validation failed", extra={ "error": str(e), "event_data": data } ) raise offer_id = response_event.offer_id booking_id = response_event.booking_id partner_id = response_event.partner_id response = response_event.response response_time = response_event.response_time logger.info( "Processing partner response event", extra={ "offer_id": str(offer_id), "booking_id": str(booking_id), "partner_id": str(partner_id), "response": response, "response_time": response_time.isoformat() } ) # Step 2 & 3: Process based on response type if response == "accept": # Process accept response success = await self.offer_service.accept_offer(offer_id) if success: logger.info( "Partner accepted offer, booking assigned", extra={ "offer_id": str(offer_id), "booking_id": str(booking_id), "partner_id": str(partner_id), "response_timestamp": datetime.utcnow().isoformat() } ) await self.notification_service.send_customer_booking_assigned( booking_id=booking_id, partner_id=partner_id, ) await self.notification_service.send_acceptance_confirmation( partner_id=partner_id, offer_id=offer_id, booking_id=booking_id, ) else: logger.warning( "Accept response processing failed (offer may already be responded)", extra={ "offer_id": str(offer_id), "booking_id": str(booking_id), "partner_id": str(partner_id) } ) elif response == "decline": # Process decline response success = await self.offer_service.decline_offer(offer_id) if success: logger.info( "Partner declined offer, triggering allocation retry", extra={ "offer_id": str(offer_id), "booking_id": str(booking_id), "partner_id": str(partner_id), "response_timestamp": datetime.utcnow().isoformat() } ) await self.notification_service.send_withdrawal_notification( partner_id=partner_id, offer_id=offer_id, booking_id=booking_id, ) # Trigger allocation retry by publishing to booking_allocation_queue retry_event = { "booking_id": str(booking_id), "retry_reason": "partner_declined", "previous_partner_id": str(partner_id), "previous_offer_id": str(offer_id) } await self.queue_manager.publish_event( "booking_allocation_queue", retry_event ) logger.info( "Allocation retry event published", extra={ "booking_id": str(booking_id), "retry_reason": "partner_declined", "previous_partner_id": str(partner_id), "next_partner_id": "to_be_determined" } ) else: logger.warning( "Decline response processing failed (offer may already be responded)", extra={ "offer_id": str(offer_id), "booking_id": str(booking_id), "partner_id": str(partner_id) } ) else: # Invalid response value raise ValueError(f"Invalid response value: {response}. Must be 'accept' or 'decline'") def _validate_event(self, event: dict) -> PartnerResponseEvent: """ Validate event structure and parse into PartnerResponseEvent model. Args: event: Raw event dictionary from queue Returns: Validated PartnerResponseEvent object Raises: ValueError: If event structure is invalid or missing required fields """ required_fields = [ "offer_id", "booking_id", "partner_id", "response", "response_time" ] # Check for required fields missing_fields = [field for field in required_fields if field not in event] if missing_fields: raise ValueError( f"Event missing required fields: {', '.join(missing_fields)}" ) # Validate response value if event["response"] not in ["accept", "decline"]: raise ValueError( f"Invalid response value: {event['response']}. Must be 'accept' or 'decline'" ) # Parse into PartnerResponseEvent model (Pydantic will validate types) try: response_event = PartnerResponseEvent( offer_id=UUID(event["offer_id"]), booking_id=UUID(event["booking_id"]), partner_id=UUID(event["partner_id"]), response=event["response"], response_time=datetime.fromisoformat(event["response_time"]) ) return response_event except (ValueError, TypeError, KeyError) as e: raise ValueError(f"Event validation failed: {str(e)}") async def _reclaim_stale_messages(self) -> None: """ Gap 4 fix: Reclaim PEL messages idle > 5min from a previously crashed instance. Must be called from within this worker — reclaiming to self.consumer_name ensures messages are assigned to a consumer that actually reads from this queue. The scheduler must NOT do this on our behalf. """ try: claimed, _, _ = await self.queue_manager.redis.xautoclaim( self.queue_name, self.consumer_group, self.consumer_name, min_idle_time=60_000, # 1 minute in ms (safe: healthy consumers ack within seconds) start_id="0-0", count=10, ) if claimed: logger.info( "Reclaimed stale PEL messages", extra={ "queue": self.queue_name, "count": len(claimed), "consumer": self.consumer_name, }, ) except Exception as e: # Consumer group may not exist yet at first startup — not fatal logger.debug("PEL reclaim skipped", extra={"queue": self.queue_name, "error": str(e)}) async def run(self) -> None: """ Run the response worker. Sets up signal handlers, reclaims any stale PEL messages from a previously crashed instance, then starts event consumption. """ logger.info( "Starting ResponseWorker", extra={"consumer_name": self.consumer_name} ) # Setup signal handlers for graceful shutdown self.setup_signal_handlers() # Gap 4 fix: reclaim stale messages before entering the consume loop await self._reclaim_stale_messages() # Start consuming events await self.consume_response_events() logger.info( "ResponseWorker stopped", extra={"consumer_name": self.consumer_name} ) async def main(): """ Main entry point for response worker. Initializes all dependencies and starts the worker. """ from app.core.config import get_settings from app.core.logging import setup_logging from app.db.postgres import create_pg_engine from app.queue.redis_client import create_redis_client settings = get_settings() setup_logging(settings.LOG_LEVEL) logger.info("Initializing Response Worker") # Initialize database engine engine = await create_pg_engine() # Initialize Redis client redis_client = await create_redis_client() # Initialize queue manager queue_manager = RedisQueueManager(redis_client) # Initialize offer service offer_service = OfferService(engine, queue_manager) # Generate unique consumer name (use pod name in Kubernetes) import os import socket consumer_name = os.getenv("HOSTNAME", socket.gethostname()) # Initialize worker worker = ResponseWorker( queue_manager=queue_manager, offer_service=offer_service, consumer_name=consumer_name ) # Run worker try: await worker.run() finally: # Cleanup await queue_manager.close() await engine.dispose() logger.info("Response Worker shutdown complete") if __name__ == "__main__": asyncio.run(main())