Spaces:
Configuration error
Configuration error
| from async_framework import MessageConsumer, Message | |
| import time | |
| import structlog | |
| from config import BROKER_CONFIG | |
| logger = structlog.get_logger() | |
| def process_order(message: Message): | |
| logger.info( | |
| "Processing order", | |
| message_id=message.id, | |
| order_id=message.payload.get("order_id"), | |
| amount=message.payload.get("amount") | |
| ) | |
| # Simulate processing time | |
| time.sleep(0.5) | |
| # Simulate occasional failures | |
| if message.payload.get("amount", 0) > 900: | |
| raise ValueError("Order amount too high") | |
| def process_notification(message: Message): | |
| logger.info( | |
| "Processing notification", | |
| message_id=message.id, | |
| notification_type=message.payload.get("type"), | |
| priority=message.payload.get("priority") | |
| ) | |
| # Simulate processing time | |
| time.sleep(0.3) | |
| # Simulate occasional failures for high priority notifications | |
| if message.payload.get("priority") == "high": | |
| if message.retry_count < 2: # Will succeed on third try | |
| raise ValueError("Failed to send high priority notification") | |
| def process_analytics(message: Message): | |
| logger.info( | |
| "Processing analytics event", | |
| message_id=message.id, | |
| event_type=message.payload.get("event_type"), | |
| user_id=message.payload.get("user_id") | |
| ) | |
| # Simulate processing time | |
| time.sleep(0.1) | |
| def main(): | |
| # Create consumers for different queues | |
| consumers = [ | |
| MessageConsumer( | |
| config=BROKER_CONFIG, | |
| queue="orders", | |
| handler=process_order | |
| ), | |
| MessageConsumer( | |
| config=BROKER_CONFIG, | |
| queue="notifications", | |
| handler=process_notification | |
| ), | |
| MessageConsumer( | |
| config=BROKER_CONFIG, | |
| queue="analytics", | |
| handler=process_analytics | |
| ) | |
| ] | |
| try: | |
| # Start all consumers | |
| for consumer in consumers: | |
| consumer.start() | |
| logger.info( | |
| "Started consumer", | |
| queue=consumer.queue | |
| ) | |
| # Keep the main thread running | |
| while True: | |
| time.sleep(1) | |
| except KeyboardInterrupt: | |
| logger.info("Shutting down consumers") | |
| for consumer in consumers: | |
| consumer.stop() | |
| if __name__ == "__main__": | |
| main() | |