from async_framework import MessageConsumer, Message import time import structlog from config import BROKER_CONFIG logger = structlog.get_logger() def process_order(message: Message): logger.info( "Processing order", message_id=message.id, order_id=message.payload.get("order_id"), amount=message.payload.get("amount") ) # Simulate processing time time.sleep(0.5) # Simulate occasional failures if message.payload.get("amount", 0) > 900: raise ValueError("Order amount too high") def process_notification(message: Message): logger.info( "Processing notification", message_id=message.id, notification_type=message.payload.get("type"), priority=message.payload.get("priority") ) # Simulate processing time time.sleep(0.3) # Simulate occasional failures for high priority notifications if message.payload.get("priority") == "high": if message.retry_count < 2: # Will succeed on third try raise ValueError("Failed to send high priority notification") def process_analytics(message: Message): logger.info( "Processing analytics event", message_id=message.id, event_type=message.payload.get("event_type"), user_id=message.payload.get("user_id") ) # Simulate processing time time.sleep(0.1) def main(): # Create consumers for different queues consumers = [ MessageConsumer( config=BROKER_CONFIG, queue="orders", handler=process_order ), MessageConsumer( config=BROKER_CONFIG, queue="notifications", handler=process_notification ), MessageConsumer( config=BROKER_CONFIG, queue="analytics", handler=process_analytics ) ] try: # Start all consumers for consumer in consumers: consumer.start() logger.info( "Started consumer", queue=consumer.queue ) # Keep the main thread running while True: time.sleep(1) except KeyboardInterrupt: logger.info("Shutting down consumers") for consumer in consumers: consumer.stop() if __name__ == "__main__": main()