backend / app /rabbitmq /consumers.py
precison9's picture
integrate RabbitMQ with CloudAMQP
3557eaa
import json
import logging
from aio_pika.abc import AbstractIncomingMessage
from app.rabbitmq.connection import rabbitmq
from app.rabbitmq.queues import (
QUEUE_AUTH_EVENTS, QUEUE_CHAT_EVENTS,
QUEUE_AUDIT_LOG, QUEUE_NOTIFICATIONS
)
logger = logging.getLogger(__name__)
async def handle_auth_event(message: AbstractIncomingMessage):
async with message.process():
try:
data = json.loads(message.body.decode())
event = data.get("event")
logger.info(f"[AUTH EVENT] {event} β€” user: {data.get('username')}")
# Add your business logic here:
# e.g. send welcome email on user.registered
# e.g. alert on suspicious login patterns
except Exception as e:
logger.error(f"Auth event handler error: {e}")
async def handle_chat_event(message: AbstractIncomingMessage):
async with message.process():
try:
data = json.loads(message.body.decode())
event = data.get("event")
logger.info(f"[CHAT EVENT] {event} β€” conv: {data.get('conversation_id')}")
# Add your business logic here:
# e.g. track usage metrics
# e.g. send notifications
except Exception as e:
logger.error(f"Chat event handler error: {e}")
async def handle_audit_log(message: AbstractIncomingMessage):
async with message.process():
try:
data = json.loads(message.body.decode())
logger.info(f"[AUDIT] {data.get('event')} β€” user: {data.get('user')} at {data.get('timestamp')}")
# Add your business logic here:
# e.g. write to audit collection in MongoDB
# e.g. forward to external SIEM
except Exception as e:
logger.error(f"Audit log handler error: {e}")
async def start_consumers():
"""Register all consumers. Called on app startup."""
if not rabbitmq.is_connected:
logger.warning("RabbitMQ not connected β€” consumers not started")
return
await rabbitmq.consume(QUEUE_AUTH_EVENTS, handle_auth_event)
await rabbitmq.consume(QUEUE_CHAT_EVENTS, handle_chat_event)
await rabbitmq.consume(QUEUE_AUDIT_LOG, handle_audit_log)
logger.info("All RabbitMQ consumers started")