Michael-Antony's picture
feat: Initialize logging in all worker entry points
513fe6b
"""
Response Worker - Consumes partner response events and processes accept/decline responses.
This worker:
1. Consumes events from partner_response_queue
2. Validates event structure
3. Processes partner accept responses (assign booking)
4. Processes partner decline responses (trigger retry)
5. Handles errors with retry logic and DLQ
6. Implements graceful shutdown
"""
import asyncio
import signal
from datetime import datetime
from typing import Set
from uuid import UUID
from app.core.logging import get_logger
from app.models.offer import PartnerResponseEvent
from app.queue.queue_manager import RedisQueueManager
from app.services.notification_service import NotificationService
from app.services.offer_service import OfferService
logger = get_logger(__name__)
class ResponseWorker:
"""
Background worker that processes partner response events.
Responsibilities:
- Consume events from partner_response_queue
- Validate event structure (offer_id, partner_id, response)
- Process accept responses (update offer and booking status)
- Process decline responses (update offer status and trigger retry)
- Handle errors with retry and DLQ
- Graceful shutdown with in-flight task completion
"""
def __init__(
self,
queue_manager: RedisQueueManager,
offer_service: OfferService,
consumer_name: str,
max_retries: int = 3
):
"""
Initialize response worker.
Args:
queue_manager: Redis queue manager for event consumption
offer_service: Service for offer response processing
consumer_name: Unique name for this consumer instance
max_retries: Maximum retry attempts for transient errors
"""
self.queue_manager = queue_manager
self.offer_service = offer_service
self.consumer_name = consumer_name
self.max_retries = max_retries
self.notification_service = NotificationService()
# Graceful shutdown management
self.shutdown_event = asyncio.Event()
self.in_flight_tasks: Set[asyncio.Task] = set()
# Queue configuration
self.queue_name = "partner_response_queue"
self.consumer_group = "response-workers"
logger.info(
"Initialized ResponseWorker",
extra={
"consumer_name": consumer_name,
"queue_name": self.queue_name,
"consumer_group": self.consumer_group,
"max_retries": max_retries
}
)
def setup_signal_handlers(self) -> None:
"""
Register signal handlers for graceful shutdown.
Handles SIGTERM and SIGINT signals.
"""
def signal_handler(signum, frame):
signal_name = signal.Signals(signum).name
logger.info(
f"Received signal {signal_name}, initiating graceful shutdown",
extra={"signal": signal_name, "consumer_name": self.consumer_name}
)
self.shutdown_event.set()
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
logger.info("Signal handlers registered for graceful shutdown")
async def consume_response_events(self) -> None:
"""
Main event loop - consumes and processes partner response events.
Implements:
- Consumer group consumption from partner_response_queue
- Graceful shutdown with in-flight task completion
- Error handling and logging
"""
logger.info(
"Starting partner response event consumption",
extra={
"queue_name": self.queue_name,
"consumer_group": self.consumer_group,
"consumer_name": self.consumer_name
}
)
# Ensure consumer group exists
await self.queue_manager.create_consumer_group(
self.queue_name,
self.consumer_group
)
_loop_count = 0
try:
# Consume events until shutdown signal
while not self.shutdown_event.is_set():
messages = await self.queue_manager.consume_events(
self.queue_name,
self.consumer_group,
self.consumer_name,
count=1,
block=5000,
)
# N1 fix: reclaim stale PEL messages periodically inside the loop,
# not just once at startup. Matches the pattern in allocation_worker.
_loop_count += 1
if _loop_count % 30 == 0:
await self._reclaim_stale_messages()
if not messages:
continue
for _stream_name, message_list in messages:
for message_id, data in message_list:
if self.shutdown_event.is_set():
logger.info("Shutdown signal detected, stopping event consumption")
return
task = asyncio.create_task(
self._process_event_with_tracking(message_id, data)
)
self.in_flight_tasks.add(task)
task.add_done_callback(self.in_flight_tasks.discard)
except asyncio.CancelledError:
logger.info("Event consumption cancelled")
except Exception as e:
logger.error(
"Fatal error in event consumption loop",
exc_info=e,
extra={"consumer_name": self.consumer_name}
)
finally:
# Wait for in-flight tasks to complete
if self.in_flight_tasks:
logger.info(
f"Waiting for {len(self.in_flight_tasks)} in-flight tasks to complete",
extra={"in_flight_count": len(self.in_flight_tasks)}
)
await asyncio.gather(*self.in_flight_tasks, return_exceptions=True)
logger.info("Response worker shutdown complete")
async def _process_event_with_tracking(self, message_id: str, data: dict) -> None:
"""
Process event with retry logic and error handling.
Args:
message_id: Redis stream message ID (used for XACK)
data: Event field dict from xreadgroup
"""
retry_count = 0
base_delay = 1.0
for attempt in range(self.max_retries):
try:
# Process the response event
await self.process_response_event(data)
# Acknowledge successful processing
await self.queue_manager.acknowledge_event(
self.queue_name,
self.consumer_group,
message_id
)
logger.debug(
"Event processed and acknowledged successfully",
extra={"message_id": message_id, "attempt": attempt + 1}
)
return
except ValueError as e:
# Permanent error - invalid event structure
logger.error(
"Invalid event structure, discarding",
extra={
"message_id": message_id,
"error": str(e),
"event_data": data
}
)
# Move to DLQ and acknowledge
await self.queue_manager.move_to_dead_letter_queue(
data, e, self.queue_name, retry_count=attempt
)
await self.queue_manager.acknowledge_event(
self.queue_name,
self.consumer_group,
message_id
)
return
except Exception as e:
retry_count = attempt + 1
if attempt < self.max_retries - 1:
# Transient error - retry with exponential backoff
delay = base_delay * (2 ** attempt)
logger.warning(
f"Event processing failed, retrying in {delay}s",
extra={
"message_id": message_id,
"attempt": retry_count,
"max_retries": self.max_retries,
"error": str(e),
"error_type": type(e).__name__
}
)
await asyncio.sleep(delay)
else:
# Max retries exceeded - move to DLQ
logger.error(
f"Event processing failed after {self.max_retries} attempts",
exc_info=e,
extra={
"message_id": message_id,
"error": str(e),
"error_type": type(e).__name__
}
)
await self.queue_manager.move_to_dead_letter_queue(
data, e, self.queue_name, retry_count=retry_count
)
await self.queue_manager.acknowledge_event(
self.queue_name,
self.consumer_group,
message_id
)
return
async def process_response_event(self, data: dict) -> None:
"""
Process a single partner response event.
Steps:
1. Validate event structure (offer_id, partner_id, response)
2. If response='accept', call offer_service.accept_offer
3. If response='decline', call offer_service.decline_offer and trigger retry
4. Log response processing details
Args:
data: Event field dict from xreadgroup
Raises:
ValueError: If event structure is invalid
Exception: For transient errors (database, Redis, etc.)
"""
# Step 1: Validate and parse event
try:
response_event = self._validate_event(data)
except ValueError as e:
logger.error(
"Event validation failed",
extra={
"error": str(e),
"event_data": data
}
)
raise
offer_id = response_event.offer_id
booking_id = response_event.booking_id
partner_id = response_event.partner_id
response = response_event.response
response_time = response_event.response_time
logger.info(
"Processing partner response event",
extra={
"offer_id": str(offer_id),
"booking_id": str(booking_id),
"partner_id": str(partner_id),
"response": response,
"response_time": response_time.isoformat()
}
)
# Step 2 & 3: Process based on response type
if response == "accept":
# Process accept response
success = await self.offer_service.accept_offer(offer_id)
if success:
logger.info(
"Partner accepted offer, booking assigned",
extra={
"offer_id": str(offer_id),
"booking_id": str(booking_id),
"partner_id": str(partner_id),
"response_timestamp": datetime.utcnow().isoformat()
}
)
await self.notification_service.send_customer_booking_assigned(
booking_id=booking_id,
partner_id=partner_id,
)
await self.notification_service.send_acceptance_confirmation(
partner_id=partner_id,
offer_id=offer_id,
booking_id=booking_id,
)
else:
logger.warning(
"Accept response processing failed (offer may already be responded)",
extra={
"offer_id": str(offer_id),
"booking_id": str(booking_id),
"partner_id": str(partner_id)
}
)
elif response == "decline":
# Process decline response
success = await self.offer_service.decline_offer(offer_id)
if success:
logger.info(
"Partner declined offer, triggering allocation retry",
extra={
"offer_id": str(offer_id),
"booking_id": str(booking_id),
"partner_id": str(partner_id),
"response_timestamp": datetime.utcnow().isoformat()
}
)
await self.notification_service.send_withdrawal_notification(
partner_id=partner_id,
offer_id=offer_id,
booking_id=booking_id,
)
# Trigger allocation retry by publishing to booking_allocation_queue
retry_event = {
"booking_id": str(booking_id),
"retry_reason": "partner_declined",
"previous_partner_id": str(partner_id),
"previous_offer_id": str(offer_id)
}
await self.queue_manager.publish_event(
"booking_allocation_queue",
retry_event
)
logger.info(
"Allocation retry event published",
extra={
"booking_id": str(booking_id),
"retry_reason": "partner_declined",
"previous_partner_id": str(partner_id),
"next_partner_id": "to_be_determined"
}
)
else:
logger.warning(
"Decline response processing failed (offer may already be responded)",
extra={
"offer_id": str(offer_id),
"booking_id": str(booking_id),
"partner_id": str(partner_id)
}
)
else:
# Invalid response value
raise ValueError(f"Invalid response value: {response}. Must be 'accept' or 'decline'")
def _validate_event(self, event: dict) -> PartnerResponseEvent:
"""
Validate event structure and parse into PartnerResponseEvent model.
Args:
event: Raw event dictionary from queue
Returns:
Validated PartnerResponseEvent object
Raises:
ValueError: If event structure is invalid or missing required fields
"""
required_fields = [
"offer_id",
"booking_id",
"partner_id",
"response",
"response_time"
]
# Check for required fields
missing_fields = [field for field in required_fields if field not in event]
if missing_fields:
raise ValueError(
f"Event missing required fields: {', '.join(missing_fields)}"
)
# Validate response value
if event["response"] not in ["accept", "decline"]:
raise ValueError(
f"Invalid response value: {event['response']}. Must be 'accept' or 'decline'"
)
# Parse into PartnerResponseEvent model (Pydantic will validate types)
try:
response_event = PartnerResponseEvent(
offer_id=UUID(event["offer_id"]),
booking_id=UUID(event["booking_id"]),
partner_id=UUID(event["partner_id"]),
response=event["response"],
response_time=datetime.fromisoformat(event["response_time"])
)
return response_event
except (ValueError, TypeError, KeyError) as e:
raise ValueError(f"Event validation failed: {str(e)}")
async def _reclaim_stale_messages(self) -> None:
"""
Gap 4 fix: Reclaim PEL messages idle > 5min from a previously crashed instance.
Must be called from within this worker — reclaiming to self.consumer_name ensures
messages are assigned to a consumer that actually reads from this queue.
The scheduler must NOT do this on our behalf.
"""
try:
claimed, _, _ = await self.queue_manager.redis.xautoclaim(
self.queue_name,
self.consumer_group,
self.consumer_name,
min_idle_time=60_000, # 1 minute in ms (safe: healthy consumers ack within seconds)
start_id="0-0",
count=10,
)
if claimed:
logger.info(
"Reclaimed stale PEL messages",
extra={
"queue": self.queue_name,
"count": len(claimed),
"consumer": self.consumer_name,
},
)
except Exception as e:
# Consumer group may not exist yet at first startup — not fatal
logger.debug("PEL reclaim skipped", extra={"queue": self.queue_name, "error": str(e)})
async def run(self) -> None:
"""
Run the response worker.
Sets up signal handlers, reclaims any stale PEL messages from a
previously crashed instance, then starts event consumption.
"""
logger.info(
"Starting ResponseWorker",
extra={"consumer_name": self.consumer_name}
)
# Setup signal handlers for graceful shutdown
self.setup_signal_handlers()
# Gap 4 fix: reclaim stale messages before entering the consume loop
await self._reclaim_stale_messages()
# Start consuming events
await self.consume_response_events()
logger.info(
"ResponseWorker stopped",
extra={"consumer_name": self.consumer_name}
)
async def main():
"""
Main entry point for response worker.
Initializes all dependencies and starts the worker.
"""
from app.core.config import get_settings
from app.core.logging import setup_logging
from app.db.postgres import create_pg_engine
from app.queue.redis_client import create_redis_client
settings = get_settings()
setup_logging(settings.LOG_LEVEL)
logger.info("Initializing Response Worker")
# Initialize database engine
engine = await create_pg_engine()
# Initialize Redis client
redis_client = await create_redis_client()
# Initialize queue manager
queue_manager = RedisQueueManager(redis_client)
# Initialize offer service
offer_service = OfferService(engine, queue_manager)
# Generate unique consumer name (use pod name in Kubernetes)
import os
import socket
consumer_name = os.getenv("HOSTNAME", socket.gethostname())
# Initialize worker
worker = ResponseWorker(
queue_manager=queue_manager,
offer_service=offer_service,
consumer_name=consumer_name
)
# Run worker
try:
await worker.run()
finally:
# Cleanup
await queue_manager.close()
await engine.dispose()
logger.info("Response Worker shutdown complete")
if __name__ == "__main__":
asyncio.run(main())