async-framework-test / consumers.py
Prabha-AIMLOPS's picture
initial files
2452e46 verified
from async_framework import MessageConsumer, Message
import time
import structlog
from config import BROKER_CONFIG
logger = structlog.get_logger()
def process_order(message: Message):
logger.info(
"Processing order",
message_id=message.id,
order_id=message.payload.get("order_id"),
amount=message.payload.get("amount")
)
# Simulate processing time
time.sleep(0.5)
# Simulate occasional failures
if message.payload.get("amount", 0) > 900:
raise ValueError("Order amount too high")
def process_notification(message: Message):
logger.info(
"Processing notification",
message_id=message.id,
notification_type=message.payload.get("type"),
priority=message.payload.get("priority")
)
# Simulate processing time
time.sleep(0.3)
# Simulate occasional failures for high priority notifications
if message.payload.get("priority") == "high":
if message.retry_count < 2: # Will succeed on third try
raise ValueError("Failed to send high priority notification")
def process_analytics(message: Message):
logger.info(
"Processing analytics event",
message_id=message.id,
event_type=message.payload.get("event_type"),
user_id=message.payload.get("user_id")
)
# Simulate processing time
time.sleep(0.1)
def main():
# Create consumers for different queues
consumers = [
MessageConsumer(
config=BROKER_CONFIG,
queue="orders",
handler=process_order
),
MessageConsumer(
config=BROKER_CONFIG,
queue="notifications",
handler=process_notification
),
MessageConsumer(
config=BROKER_CONFIG,
queue="analytics",
handler=process_analytics
)
]
try:
# Start all consumers
for consumer in consumers:
consumer.start()
logger.info(
"Started consumer",
queue=consumer.queue
)
# Keep the main thread running
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down consumers")
for consumer in consumers:
consumer.stop()
if __name__ == "__main__":
main()