async-framework / README.md
Prabha-AIMLOPS's picture
initial commit
3243379 verified

Async Framework

A Python library for Redis-based message queue processing with monitoring and retry capabilities. This framework provides a robust solution for distributed message processing with features like automatic retries, dead letter queues, and Prometheus metrics.

Features

  • Redis-based message broker and consumer
  • Configurable number of workers
  • JSON message format with standardized structure
  • Efficient Redis connection pooling
  • Prometheus metrics for monitoring
  • Automatic and manual retry mechanisms
  • Dead letter queue for failed messages
  • Batch processing support
  • Distributed environment ready

Requirements

  • Python 3.10+
  • Redis server
  • Dependencies (automatically installed):
    • redis>=4.5.0
    • pydantic>=2.0.0
    • prometheus-client>=0.17.0
    • tenacity>=8.2.0
    • structlog>=23.1.0

Installation

Installing from Source

  1. Clone the repository:
git clone https://github.com/yourusername/async-framework.git
cd async-framework
  1. Install the package:
pip install .

Building the Package

To build the package as a wheel file:

  1. Install build requirements:
pip install build wheel
  1. Build the package:
python -m build

This will create two files in the dist directory:

  • async_framework-0.1.0-py3-none-any.whl (Wheel file)
  • async_framework-0.1.0.tar.gz (Source distribution)

Installing the Built Package

You can install the wheel file in other projects using:

pip install async_framework-0.1.0-py3-none-any.whl

Or install directly from your private PyPI repository:

pip install async-framework

Usage

Configuration

from async_framework import BrokerConfig, RedisConfig, RetryConfig

config = BrokerConfig(
    redis=RedisConfig(
        host="localhost",
        port=6379,
        password="optional_password",
        connection_pool_size=10
    ),
    retry=RetryConfig(
        max_retries=3,
        initial_delay=1.0,
        max_delay=60.0,
        backoff_factor=2.0
    ),
    num_workers=4,
    batch_size=10,
    polling_interval=1.0,
    metrics_port=8000
)

Publishing Messages

from async_framework import MessageBroker

# Initialize the broker
broker = MessageBroker(config)

# Publish a message
message = broker.publish(
    queue="my_queue",
    payload={"key": "value"},
    max_retries=3  # Optional, defaults to config value
)

Consuming Messages

from async_framework import MessageConsumer, Message

# Define a message handler
def process_message(message: Message):
    print(f"Processing message {message.id}: {message.payload}")
    # Your processing logic here
    # Raise an exception to trigger retry mechanism

# Initialize the consumer
consumer = MessageConsumer(
    config=config,
    queue="my_queue",
    handler=process_message
)

# Start consuming messages
consumer.start()

try:
    # Keep the main thread running
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    consumer.stop()

Monitoring

The framework exposes Prometheus metrics at http://localhost:8000 (configurable via metrics_port):

  • messages_published_total - Counter of published messages by queue
  • messages_processed_total - Counter of processed messages by queue and status
  • message_processing_seconds - Histogram of message processing time by queue

Queue Structure

The framework uses the following Redis key patterns:

  • queue:{queue_name} - Main message queue
  • retry:{queue_name} - Retry queue with scheduled messages
  • dead_letter:{queue_name} - Dead letter queue for failed messages

Message Format

Messages are stored in JSON format with the following structure:

{
    "id": "uuid",
    "queue": "queue_name",
    "payload": {
        // Your message data
    },
    "created_at": "2023-08-26T12:00:00Z",
    "retry_count": 0,
    "max_retries": 3,
    "next_retry_at": null,
    "error": null
}

Development

  1. Create a virtual environment:
python -m venv venv
source venv/bin/activate  # Linux/MacOS
# or
.\\venv\\Scripts\\activate  # Windows
  1. Install development dependencies:
pip install -e ".[dev]"
  1. Run tests:
pytest

License

MIT License. See LICENSE file for details.