# Async Framework A Python library for Redis-based message queue processing with monitoring and retry capabilities. This framework provides a robust solution for distributed message processing with features like automatic retries, dead letter queues, and Prometheus metrics. ## Features - Redis-based message broker and consumer - Configurable number of workers - JSON message format with standardized structure - Efficient Redis connection pooling - Prometheus metrics for monitoring - Automatic and manual retry mechanisms - Dead letter queue for failed messages - Batch processing support - Distributed environment ready ## Requirements - Python 3.10+ - Redis server - Dependencies (automatically installed): - redis>=4.5.0 - pydantic>=2.0.0 - prometheus-client>=0.17.0 - tenacity>=8.2.0 - structlog>=23.1.0 ## Installation ### Installing from Source 1. Clone the repository: ```bash git clone https://github.com/yourusername/async-framework.git cd async-framework ``` 2. Install the package: ```bash pip install . ``` ### Building the Package To build the package as a wheel file: 1. Install build requirements: ```bash pip install build wheel ``` 2. Build the package: ```bash python -m build ``` This will create two files in the `dist` directory: - `async_framework-0.1.0-py3-none-any.whl` (Wheel file) - `async_framework-0.1.0.tar.gz` (Source distribution) ### Installing the Built Package You can install the wheel file in other projects using: ```bash pip install async_framework-0.1.0-py3-none-any.whl ``` Or install directly from your private PyPI repository: ```bash pip install async-framework ``` ## Usage ### Configuration ```python from async_framework import BrokerConfig, RedisConfig, RetryConfig config = BrokerConfig( redis=RedisConfig( host="localhost", port=6379, password="optional_password", connection_pool_size=10 ), retry=RetryConfig( max_retries=3, initial_delay=1.0, max_delay=60.0, backoff_factor=2.0 ), num_workers=4, batch_size=10, polling_interval=1.0, metrics_port=8000 ) ``` ### Publishing Messages ```python from async_framework import MessageBroker # Initialize the broker broker = MessageBroker(config) # Publish a message message = broker.publish( queue="my_queue", payload={"key": "value"}, max_retries=3 # Optional, defaults to config value ) ``` ### Consuming Messages ```python from async_framework import MessageConsumer, Message # Define a message handler def process_message(message: Message): print(f"Processing message {message.id}: {message.payload}") # Your processing logic here # Raise an exception to trigger retry mechanism # Initialize the consumer consumer = MessageConsumer( config=config, queue="my_queue", handler=process_message ) # Start consuming messages consumer.start() try: # Keep the main thread running while True: time.sleep(1) except KeyboardInterrupt: consumer.stop() ``` ### Monitoring The framework exposes Prometheus metrics at `http://localhost:8000` (configurable via `metrics_port`): - `messages_published_total` - Counter of published messages by queue - `messages_processed_total` - Counter of processed messages by queue and status - `message_processing_seconds` - Histogram of message processing time by queue ### Queue Structure The framework uses the following Redis key patterns: - `queue:{queue_name}` - Main message queue - `retry:{queue_name}` - Retry queue with scheduled messages - `dead_letter:{queue_name}` - Dead letter queue for failed messages ### Message Format Messages are stored in JSON format with the following structure: ```json { "id": "uuid", "queue": "queue_name", "payload": { // Your message data }, "created_at": "2023-08-26T12:00:00Z", "retry_count": 0, "max_retries": 3, "next_retry_at": null, "error": null } ``` ## Development 1. Create a virtual environment: ```bash python -m venv venv source venv/bin/activate # Linux/MacOS # or .\\venv\\Scripts\\activate # Windows ``` 2. Install development dependencies: ```bash pip install -e ".[dev]" ``` 3. Run tests: ```bash pytest ``` ## License MIT License. See LICENSE file for details.