Spaces:
Configuration error
Configuration error
File size: 4,498 Bytes
3243379 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 | # Async Framework
A Python library for Redis-based message queue processing with monitoring and retry capabilities. This framework provides a robust solution for distributed message processing with features like automatic retries, dead letter queues, and Prometheus metrics.
## Features
- Redis-based message broker and consumer
- Configurable number of workers
- JSON message format with standardized structure
- Efficient Redis connection pooling
- Prometheus metrics for monitoring
- Automatic and manual retry mechanisms
- Dead letter queue for failed messages
- Batch processing support
- Distributed environment ready
## Requirements
- Python 3.10+
- Redis server
- Dependencies (automatically installed):
- redis>=4.5.0
- pydantic>=2.0.0
- prometheus-client>=0.17.0
- tenacity>=8.2.0
- structlog>=23.1.0
## Installation
### Installing from Source
1. Clone the repository:
```bash
git clone https://github.com/yourusername/async-framework.git
cd async-framework
```
2. Install the package:
```bash
pip install .
```
### Building the Package
To build the package as a wheel file:
1. Install build requirements:
```bash
pip install build wheel
```
2. Build the package:
```bash
python -m build
```
This will create two files in the `dist` directory:
- `async_framework-0.1.0-py3-none-any.whl` (Wheel file)
- `async_framework-0.1.0.tar.gz` (Source distribution)
### Installing the Built Package
You can install the wheel file in other projects using:
```bash
pip install async_framework-0.1.0-py3-none-any.whl
```
Or install directly from your private PyPI repository:
```bash
pip install async-framework
```
## Usage
### Configuration
```python
from async_framework import BrokerConfig, RedisConfig, RetryConfig
config = BrokerConfig(
redis=RedisConfig(
host="localhost",
port=6379,
password="optional_password",
connection_pool_size=10
),
retry=RetryConfig(
max_retries=3,
initial_delay=1.0,
max_delay=60.0,
backoff_factor=2.0
),
num_workers=4,
batch_size=10,
polling_interval=1.0,
metrics_port=8000
)
```
### Publishing Messages
```python
from async_framework import MessageBroker
# Initialize the broker
broker = MessageBroker(config)
# Publish a message
message = broker.publish(
queue="my_queue",
payload={"key": "value"},
max_retries=3 # Optional, defaults to config value
)
```
### Consuming Messages
```python
from async_framework import MessageConsumer, Message
# Define a message handler
def process_message(message: Message):
print(f"Processing message {message.id}: {message.payload}")
# Your processing logic here
# Raise an exception to trigger retry mechanism
# Initialize the consumer
consumer = MessageConsumer(
config=config,
queue="my_queue",
handler=process_message
)
# Start consuming messages
consumer.start()
try:
# Keep the main thread running
while True:
time.sleep(1)
except KeyboardInterrupt:
consumer.stop()
```
### Monitoring
The framework exposes Prometheus metrics at `http://localhost:8000` (configurable via `metrics_port`):
- `messages_published_total` - Counter of published messages by queue
- `messages_processed_total` - Counter of processed messages by queue and status
- `message_processing_seconds` - Histogram of message processing time by queue
### Queue Structure
The framework uses the following Redis key patterns:
- `queue:{queue_name}` - Main message queue
- `retry:{queue_name}` - Retry queue with scheduled messages
- `dead_letter:{queue_name}` - Dead letter queue for failed messages
### Message Format
Messages are stored in JSON format with the following structure:
```json
{
"id": "uuid",
"queue": "queue_name",
"payload": {
// Your message data
},
"created_at": "2023-08-26T12:00:00Z",
"retry_count": 0,
"max_retries": 3,
"next_retry_at": null,
"error": null
}
```
## Development
1. Create a virtual environment:
```bash
python -m venv venv
source venv/bin/activate # Linux/MacOS
# or
.\\venv\\Scripts\\activate # Windows
```
2. Install development dependencies:
```bash
pip install -e ".[dev]"
```
3. Run tests:
```bash
pytest
```
## License
MIT License. See LICENSE file for details.
|