| import json |
| import logging |
| from aio_pika.abc import AbstractIncomingMessage |
| from app.rabbitmq.connection import rabbitmq |
| from app.rabbitmq.queues import ( |
| QUEUE_AUTH_EVENTS, QUEUE_CHAT_EVENTS, |
| QUEUE_AUDIT_LOG, QUEUE_NOTIFICATIONS |
| ) |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| async def handle_auth_event(message: AbstractIncomingMessage): |
| async with message.process(): |
| try: |
| data = json.loads(message.body.decode()) |
| event = data.get("event") |
| logger.info(f"[AUTH EVENT] {event} β user: {data.get('username')}") |
| |
| |
| |
| except Exception as e: |
| logger.error(f"Auth event handler error: {e}") |
|
|
|
|
| async def handle_chat_event(message: AbstractIncomingMessage): |
| async with message.process(): |
| try: |
| data = json.loads(message.body.decode()) |
| event = data.get("event") |
| logger.info(f"[CHAT EVENT] {event} β conv: {data.get('conversation_id')}") |
| |
| |
| |
| except Exception as e: |
| logger.error(f"Chat event handler error: {e}") |
|
|
|
|
| async def handle_audit_log(message: AbstractIncomingMessage): |
| async with message.process(): |
| try: |
| data = json.loads(message.body.decode()) |
| logger.info(f"[AUDIT] {data.get('event')} β user: {data.get('user')} at {data.get('timestamp')}") |
| |
| |
| |
| except Exception as e: |
| logger.error(f"Audit log handler error: {e}") |
|
|
|
|
| async def start_consumers(): |
| """Register all consumers. Called on app startup.""" |
| if not rabbitmq.is_connected: |
| logger.warning("RabbitMQ not connected β consumers not started") |
| return |
| await rabbitmq.consume(QUEUE_AUTH_EVENTS, handle_auth_event) |
| await rabbitmq.consume(QUEUE_CHAT_EVENTS, handle_chat_event) |
| await rabbitmq.consume(QUEUE_AUDIT_LOG, handle_audit_log) |
| logger.info("All RabbitMQ consumers started") |
|
|