Prabha-AIMLOPS's picture
initial commit
3243379 verified
import redis
from typing import Dict, Any
from datetime import datetime, timedelta
import structlog
from prometheus_client import Counter, start_http_server
from .config import BrokerConfig
from .message import Message
logger = structlog.get_logger()
messages_published = Counter(
"messages_published_total",
"Total number of messages published",
["queue"]
)
class MessageBroker:
def __init__(self, config: BrokerConfig):
self.config = config
logger.info("Creating Redis connection pool",
host=config.redis.host,
port=config.redis.port,
ssl=config.redis.ssl)
# Base connection parameters
connection_params = {
"host": config.redis.host,
"port": config.redis.port,
"db": config.redis.db,
"password": config.redis.password,
"decode_responses": True,
"max_connections": config.redis.connection_pool_size
}
# Add SSL configuration if enabled
if config.redis.ssl:
connection_params.update({
"ssl": True,
"ssl_cert_reqs": None,
"ssl_ca_certs": None
})
connection_pool = redis.ConnectionPool(**connection_params)
self._redis = redis.Redis(connection_pool=connection_pool)
# Start Prometheus metrics server
start_http_server(config.metrics_port)
logger.info("Message broker initialized", config=config.dict())
def publish(self, queue: str, payload: Dict[str, Any], max_retries: int = None) -> Message:
"""Publish a message to a queue."""
message = Message(
queue=queue,
payload=payload,
max_retries=max_retries or self.config.retry.max_retries
)
try:
self._redis.lpush(f"queue:{queue}", message.to_json())
messages_published.labels(queue=queue).inc()
logger.info("Message published", message_id=message.id, queue=queue)
return message
except redis.RedisError as e:
logger.error("Failed to publish message", error=str(e), queue=queue)
raise
def retry_message(self, message: Message) -> None:
"""Move a message to the retry queue."""
if message.retry_count >= message.max_retries:
self._move_to_dead_letter(message)
return
message.retry_count += 1
delay = min(
self.config.retry.initial_delay * (self.config.retry.backoff_factor ** (message.retry_count - 1)),
self.config.retry.max_delay
)
message.next_retry_at = datetime.utcnow() + timedelta(seconds=delay)
try:
self._redis.lpush(f"retry:{message.queue}", message.to_json())
logger.info(
"Message moved to retry queue",
message_id=message.id,
queue=message.queue,
retry_count=message.retry_count
)
except redis.RedisError as e:
logger.error("Failed to move message to retry queue", error=str(e))
raise
def _move_to_dead_letter(self, message: Message) -> None:
"""Move a message to the dead letter queue."""
try:
self._redis.lpush(f"dead_letter:{message.queue}", message.to_json())
logger.warning(
"Message moved to dead letter queue",
message_id=message.id,
queue=message.queue,
retry_count=message.retry_count
)
except redis.RedisError as e:
logger.error("Failed to move message to dead letter queue", error=str(e))
raise