async-framework / README.md
Prabha-AIMLOPS's picture
initial commit
3243379 verified
# Async Framework
A Python library for Redis-based message queue processing with monitoring and retry capabilities. This framework provides a robust solution for distributed message processing with features like automatic retries, dead letter queues, and Prometheus metrics.
## Features
- Redis-based message broker and consumer
- Configurable number of workers
- JSON message format with standardized structure
- Efficient Redis connection pooling
- Prometheus metrics for monitoring
- Automatic and manual retry mechanisms
- Dead letter queue for failed messages
- Batch processing support
- Distributed environment ready
## Requirements
- Python 3.10+
- Redis server
- Dependencies (automatically installed):
- redis>=4.5.0
- pydantic>=2.0.0
- prometheus-client>=0.17.0
- tenacity>=8.2.0
- structlog>=23.1.0
## Installation
### Installing from Source
1. Clone the repository:
```bash
git clone https://github.com/yourusername/async-framework.git
cd async-framework
```
2. Install the package:
```bash
pip install .
```
### Building the Package
To build the package as a wheel file:
1. Install build requirements:
```bash
pip install build wheel
```
2. Build the package:
```bash
python -m build
```
This will create two files in the `dist` directory:
- `async_framework-0.1.0-py3-none-any.whl` (Wheel file)
- `async_framework-0.1.0.tar.gz` (Source distribution)
### Installing the Built Package
You can install the wheel file in other projects using:
```bash
pip install async_framework-0.1.0-py3-none-any.whl
```
Or install directly from your private PyPI repository:
```bash
pip install async-framework
```
## Usage
### Configuration
```python
from async_framework import BrokerConfig, RedisConfig, RetryConfig
config = BrokerConfig(
redis=RedisConfig(
host="localhost",
port=6379,
password="optional_password",
connection_pool_size=10
),
retry=RetryConfig(
max_retries=3,
initial_delay=1.0,
max_delay=60.0,
backoff_factor=2.0
),
num_workers=4,
batch_size=10,
polling_interval=1.0,
metrics_port=8000
)
```
### Publishing Messages
```python
from async_framework import MessageBroker
# Initialize the broker
broker = MessageBroker(config)
# Publish a message
message = broker.publish(
queue="my_queue",
payload={"key": "value"},
max_retries=3 # Optional, defaults to config value
)
```
### Consuming Messages
```python
from async_framework import MessageConsumer, Message
# Define a message handler
def process_message(message: Message):
print(f"Processing message {message.id}: {message.payload}")
# Your processing logic here
# Raise an exception to trigger retry mechanism
# Initialize the consumer
consumer = MessageConsumer(
config=config,
queue="my_queue",
handler=process_message
)
# Start consuming messages
consumer.start()
try:
# Keep the main thread running
while True:
time.sleep(1)
except KeyboardInterrupt:
consumer.stop()
```
### Monitoring
The framework exposes Prometheus metrics at `http://localhost:8000` (configurable via `metrics_port`):
- `messages_published_total` - Counter of published messages by queue
- `messages_processed_total` - Counter of processed messages by queue and status
- `message_processing_seconds` - Histogram of message processing time by queue
### Queue Structure
The framework uses the following Redis key patterns:
- `queue:{queue_name}` - Main message queue
- `retry:{queue_name}` - Retry queue with scheduled messages
- `dead_letter:{queue_name}` - Dead letter queue for failed messages
### Message Format
Messages are stored in JSON format with the following structure:
```json
{
"id": "uuid",
"queue": "queue_name",
"payload": {
// Your message data
},
"created_at": "2023-08-26T12:00:00Z",
"retry_count": 0,
"max_retries": 3,
"next_retry_at": null,
"error": null
}
```
## Development
1. Create a virtual environment:
```bash
python -m venv venv
source venv/bin/activate # Linux/MacOS
# or
.\\venv\\Scripts\\activate # Windows
```
2. Install development dependencies:
```bash
pip install -e ".[dev]"
```
3. Run tests:
```bash
pytest
```
## License
MIT License. See LICENSE file for details.