Spaces:
Configuration error
Configuration error
Async Framework
A Python library for Redis-based message queue processing with monitoring and retry capabilities. This framework provides a robust solution for distributed message processing with features like automatic retries, dead letter queues, and Prometheus metrics.
Features
- Redis-based message broker and consumer
- Configurable number of workers
- JSON message format with standardized structure
- Efficient Redis connection pooling
- Prometheus metrics for monitoring
- Automatic and manual retry mechanisms
- Dead letter queue for failed messages
- Batch processing support
- Distributed environment ready
Requirements
- Python 3.10+
- Redis server
- Dependencies (automatically installed):
- redis>=4.5.0
- pydantic>=2.0.0
- prometheus-client>=0.17.0
- tenacity>=8.2.0
- structlog>=23.1.0
Installation
Installing from Source
- Clone the repository:
git clone https://github.com/yourusername/async-framework.git
cd async-framework
- Install the package:
pip install .
Building the Package
To build the package as a wheel file:
- Install build requirements:
pip install build wheel
- Build the package:
python -m build
This will create two files in the dist directory:
async_framework-0.1.0-py3-none-any.whl(Wheel file)async_framework-0.1.0.tar.gz(Source distribution)
Installing the Built Package
You can install the wheel file in other projects using:
pip install async_framework-0.1.0-py3-none-any.whl
Or install directly from your private PyPI repository:
pip install async-framework
Usage
Configuration
from async_framework import BrokerConfig, RedisConfig, RetryConfig
config = BrokerConfig(
redis=RedisConfig(
host="localhost",
port=6379,
password="optional_password",
connection_pool_size=10
),
retry=RetryConfig(
max_retries=3,
initial_delay=1.0,
max_delay=60.0,
backoff_factor=2.0
),
num_workers=4,
batch_size=10,
polling_interval=1.0,
metrics_port=8000
)
Publishing Messages
from async_framework import MessageBroker
# Initialize the broker
broker = MessageBroker(config)
# Publish a message
message = broker.publish(
queue="my_queue",
payload={"key": "value"},
max_retries=3 # Optional, defaults to config value
)
Consuming Messages
from async_framework import MessageConsumer, Message
# Define a message handler
def process_message(message: Message):
print(f"Processing message {message.id}: {message.payload}")
# Your processing logic here
# Raise an exception to trigger retry mechanism
# Initialize the consumer
consumer = MessageConsumer(
config=config,
queue="my_queue",
handler=process_message
)
# Start consuming messages
consumer.start()
try:
# Keep the main thread running
while True:
time.sleep(1)
except KeyboardInterrupt:
consumer.stop()
Monitoring
The framework exposes Prometheus metrics at http://localhost:8000 (configurable via metrics_port):
messages_published_total- Counter of published messages by queuemessages_processed_total- Counter of processed messages by queue and statusmessage_processing_seconds- Histogram of message processing time by queue
Queue Structure
The framework uses the following Redis key patterns:
queue:{queue_name}- Main message queueretry:{queue_name}- Retry queue with scheduled messagesdead_letter:{queue_name}- Dead letter queue for failed messages
Message Format
Messages are stored in JSON format with the following structure:
{
"id": "uuid",
"queue": "queue_name",
"payload": {
// Your message data
},
"created_at": "2023-08-26T12:00:00Z",
"retry_count": 0,
"max_retries": 3,
"next_retry_at": null,
"error": null
}
Development
- Create a virtual environment:
python -m venv venv
source venv/bin/activate # Linux/MacOS
# or
.\\venv\\Scripts\\activate # Windows
- Install development dependencies:
pip install -e ".[dev]"
- Run tests:
pytest
License
MIT License. See LICENSE file for details.