File size: 2,438 Bytes
2452e46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from async_framework import MessageConsumer, Message
import time
import structlog
from config import BROKER_CONFIG

logger = structlog.get_logger()

def process_order(message: Message):
    logger.info(
        "Processing order",
        message_id=message.id,
        order_id=message.payload.get("order_id"),
        amount=message.payload.get("amount")
    )
    # Simulate processing time
    time.sleep(0.5)
    # Simulate occasional failures
    if message.payload.get("amount", 0) > 900:
        raise ValueError("Order amount too high")

def process_notification(message: Message):
    logger.info(
        "Processing notification",
        message_id=message.id,
        notification_type=message.payload.get("type"),
        priority=message.payload.get("priority")
    )
    # Simulate processing time
    time.sleep(0.3)
    # Simulate occasional failures for high priority notifications
    if message.payload.get("priority") == "high":
        if message.retry_count < 2:  # Will succeed on third try
            raise ValueError("Failed to send high priority notification")

def process_analytics(message: Message):
    logger.info(
        "Processing analytics event",
        message_id=message.id,
        event_type=message.payload.get("event_type"),
        user_id=message.payload.get("user_id")
    )
    # Simulate processing time
    time.sleep(0.1)

def main():
    # Create consumers for different queues
    consumers = [
        MessageConsumer(
            config=BROKER_CONFIG,
            queue="orders",
            handler=process_order
        ),
        MessageConsumer(
            config=BROKER_CONFIG,
            queue="notifications",
            handler=process_notification
        ),
        MessageConsumer(
            config=BROKER_CONFIG,
            queue="analytics",
            handler=process_analytics
        )
    ]

    try:
        # Start all consumers
        for consumer in consumers:
            consumer.start()
            logger.info(
                "Started consumer",
                queue=consumer.queue
            )

        # Keep the main thread running
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("Shutting down consumers")
        for consumer in consumers:
            consumer.stop()

if __name__ == "__main__":
    main()