Spaces:
Configuration error
Configuration error
initial commit
Browse files- README.md +200 -12
- async_framework/__init__.py +7 -0
- async_framework/broker.py +104 -0
- async_framework/config.py +24 -0
- async_framework/consumer.py +202 -0
- async_framework/message.py +35 -0
- pyproject.toml +20 -0
- setup.py +27 -0
README.md
CHANGED
|
@@ -1,12 +1,200 @@
|
|
| 1 |
-
|
| 2 |
-
|
| 3 |
-
|
| 4 |
-
|
| 5 |
-
|
| 6 |
-
|
| 7 |
-
|
| 8 |
-
|
| 9 |
-
|
| 10 |
-
-
|
| 11 |
-
|
| 12 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
# Async Framework
|
| 2 |
+
|
| 3 |
+
A Python library for Redis-based message queue processing with monitoring and retry capabilities. This framework provides a robust solution for distributed message processing with features like automatic retries, dead letter queues, and Prometheus metrics.
|
| 4 |
+
|
| 5 |
+
## Features
|
| 6 |
+
|
| 7 |
+
- Redis-based message broker and consumer
|
| 8 |
+
- Configurable number of workers
|
| 9 |
+
- JSON message format with standardized structure
|
| 10 |
+
- Efficient Redis connection pooling
|
| 11 |
+
- Prometheus metrics for monitoring
|
| 12 |
+
- Automatic and manual retry mechanisms
|
| 13 |
+
- Dead letter queue for failed messages
|
| 14 |
+
- Batch processing support
|
| 15 |
+
- Distributed environment ready
|
| 16 |
+
|
| 17 |
+
## Requirements
|
| 18 |
+
|
| 19 |
+
- Python 3.10+
|
| 20 |
+
- Redis server
|
| 21 |
+
- Dependencies (automatically installed):
|
| 22 |
+
- redis>=4.5.0
|
| 23 |
+
- pydantic>=2.0.0
|
| 24 |
+
- prometheus-client>=0.17.0
|
| 25 |
+
- tenacity>=8.2.0
|
| 26 |
+
- structlog>=23.1.0
|
| 27 |
+
|
| 28 |
+
## Installation
|
| 29 |
+
|
| 30 |
+
### Installing from Source
|
| 31 |
+
1. Clone the repository:
|
| 32 |
+
```bash
|
| 33 |
+
git clone https://github.com/yourusername/async-framework.git
|
| 34 |
+
cd async-framework
|
| 35 |
+
```
|
| 36 |
+
|
| 37 |
+
2. Install the package:
|
| 38 |
+
```bash
|
| 39 |
+
pip install .
|
| 40 |
+
```
|
| 41 |
+
|
| 42 |
+
### Building the Package
|
| 43 |
+
To build the package as a wheel file:
|
| 44 |
+
|
| 45 |
+
1. Install build requirements:
|
| 46 |
+
```bash
|
| 47 |
+
pip install build wheel
|
| 48 |
+
```
|
| 49 |
+
|
| 50 |
+
2. Build the package:
|
| 51 |
+
```bash
|
| 52 |
+
python -m build
|
| 53 |
+
```
|
| 54 |
+
|
| 55 |
+
This will create two files in the `dist` directory:
|
| 56 |
+
- `async_framework-0.1.0-py3-none-any.whl` (Wheel file)
|
| 57 |
+
- `async_framework-0.1.0.tar.gz` (Source distribution)
|
| 58 |
+
|
| 59 |
+
### Installing the Built Package
|
| 60 |
+
|
| 61 |
+
You can install the wheel file in other projects using:
|
| 62 |
+
```bash
|
| 63 |
+
pip install async_framework-0.1.0-py3-none-any.whl
|
| 64 |
+
```
|
| 65 |
+
|
| 66 |
+
Or install directly from your private PyPI repository:
|
| 67 |
+
```bash
|
| 68 |
+
pip install async-framework
|
| 69 |
+
```
|
| 70 |
+
|
| 71 |
+
## Usage
|
| 72 |
+
|
| 73 |
+
### Configuration
|
| 74 |
+
|
| 75 |
+
```python
|
| 76 |
+
from async_framework import BrokerConfig, RedisConfig, RetryConfig
|
| 77 |
+
|
| 78 |
+
config = BrokerConfig(
|
| 79 |
+
redis=RedisConfig(
|
| 80 |
+
host="localhost",
|
| 81 |
+
port=6379,
|
| 82 |
+
password="optional_password",
|
| 83 |
+
connection_pool_size=10
|
| 84 |
+
),
|
| 85 |
+
retry=RetryConfig(
|
| 86 |
+
max_retries=3,
|
| 87 |
+
initial_delay=1.0,
|
| 88 |
+
max_delay=60.0,
|
| 89 |
+
backoff_factor=2.0
|
| 90 |
+
),
|
| 91 |
+
num_workers=4,
|
| 92 |
+
batch_size=10,
|
| 93 |
+
polling_interval=1.0,
|
| 94 |
+
metrics_port=8000
|
| 95 |
+
)
|
| 96 |
+
```
|
| 97 |
+
|
| 98 |
+
### Publishing Messages
|
| 99 |
+
|
| 100 |
+
```python
|
| 101 |
+
from async_framework import MessageBroker
|
| 102 |
+
|
| 103 |
+
# Initialize the broker
|
| 104 |
+
broker = MessageBroker(config)
|
| 105 |
+
|
| 106 |
+
# Publish a message
|
| 107 |
+
message = broker.publish(
|
| 108 |
+
queue="my_queue",
|
| 109 |
+
payload={"key": "value"},
|
| 110 |
+
max_retries=3 # Optional, defaults to config value
|
| 111 |
+
)
|
| 112 |
+
```
|
| 113 |
+
|
| 114 |
+
### Consuming Messages
|
| 115 |
+
|
| 116 |
+
```python
|
| 117 |
+
from async_framework import MessageConsumer, Message
|
| 118 |
+
|
| 119 |
+
# Define a message handler
|
| 120 |
+
def process_message(message: Message):
|
| 121 |
+
print(f"Processing message {message.id}: {message.payload}")
|
| 122 |
+
# Your processing logic here
|
| 123 |
+
# Raise an exception to trigger retry mechanism
|
| 124 |
+
|
| 125 |
+
# Initialize the consumer
|
| 126 |
+
consumer = MessageConsumer(
|
| 127 |
+
config=config,
|
| 128 |
+
queue="my_queue",
|
| 129 |
+
handler=process_message
|
| 130 |
+
)
|
| 131 |
+
|
| 132 |
+
# Start consuming messages
|
| 133 |
+
consumer.start()
|
| 134 |
+
|
| 135 |
+
try:
|
| 136 |
+
# Keep the main thread running
|
| 137 |
+
while True:
|
| 138 |
+
time.sleep(1)
|
| 139 |
+
except KeyboardInterrupt:
|
| 140 |
+
consumer.stop()
|
| 141 |
+
```
|
| 142 |
+
|
| 143 |
+
### Monitoring
|
| 144 |
+
|
| 145 |
+
The framework exposes Prometheus metrics at `http://localhost:8000` (configurable via `metrics_port`):
|
| 146 |
+
|
| 147 |
+
- `messages_published_total` - Counter of published messages by queue
|
| 148 |
+
- `messages_processed_total` - Counter of processed messages by queue and status
|
| 149 |
+
- `message_processing_seconds` - Histogram of message processing time by queue
|
| 150 |
+
|
| 151 |
+
### Queue Structure
|
| 152 |
+
|
| 153 |
+
The framework uses the following Redis key patterns:
|
| 154 |
+
|
| 155 |
+
- `queue:{queue_name}` - Main message queue
|
| 156 |
+
- `retry:{queue_name}` - Retry queue with scheduled messages
|
| 157 |
+
- `dead_letter:{queue_name}` - Dead letter queue for failed messages
|
| 158 |
+
|
| 159 |
+
### Message Format
|
| 160 |
+
|
| 161 |
+
Messages are stored in JSON format with the following structure:
|
| 162 |
+
|
| 163 |
+
```json
|
| 164 |
+
{
|
| 165 |
+
"id": "uuid",
|
| 166 |
+
"queue": "queue_name",
|
| 167 |
+
"payload": {
|
| 168 |
+
// Your message data
|
| 169 |
+
},
|
| 170 |
+
"created_at": "2023-08-26T12:00:00Z",
|
| 171 |
+
"retry_count": 0,
|
| 172 |
+
"max_retries": 3,
|
| 173 |
+
"next_retry_at": null,
|
| 174 |
+
"error": null
|
| 175 |
+
}
|
| 176 |
+
```
|
| 177 |
+
|
| 178 |
+
## Development
|
| 179 |
+
|
| 180 |
+
1. Create a virtual environment:
|
| 181 |
+
```bash
|
| 182 |
+
python -m venv venv
|
| 183 |
+
source venv/bin/activate # Linux/MacOS
|
| 184 |
+
# or
|
| 185 |
+
.\\venv\\Scripts\\activate # Windows
|
| 186 |
+
```
|
| 187 |
+
|
| 188 |
+
2. Install development dependencies:
|
| 189 |
+
```bash
|
| 190 |
+
pip install -e ".[dev]"
|
| 191 |
+
```
|
| 192 |
+
|
| 193 |
+
3. Run tests:
|
| 194 |
+
```bash
|
| 195 |
+
pytest
|
| 196 |
+
```
|
| 197 |
+
|
| 198 |
+
## License
|
| 199 |
+
|
| 200 |
+
MIT License. See LICENSE file for details.
|
async_framework/__init__.py
ADDED
|
@@ -0,0 +1,7 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
from .broker import MessageBroker
|
| 2 |
+
from .consumer import MessageConsumer
|
| 3 |
+
from .config import BrokerConfig, RedisConfig, RetryConfig
|
| 4 |
+
from .message import Message
|
| 5 |
+
|
| 6 |
+
__version__ = "0.1.0"
|
| 7 |
+
__all__ = ["MessageBroker", "MessageConsumer", "BrokerConfig", "RedisConfig", "RetryConfig", "Message"]
|
async_framework/broker.py
ADDED
|
@@ -0,0 +1,104 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
import redis
|
| 2 |
+
from typing import Dict, Any
|
| 3 |
+
from datetime import datetime, timedelta
|
| 4 |
+
import structlog
|
| 5 |
+
from prometheus_client import Counter, start_http_server
|
| 6 |
+
|
| 7 |
+
from .config import BrokerConfig
|
| 8 |
+
from .message import Message
|
| 9 |
+
|
| 10 |
+
logger = structlog.get_logger()
|
| 11 |
+
|
| 12 |
+
messages_published = Counter(
|
| 13 |
+
"messages_published_total",
|
| 14 |
+
"Total number of messages published",
|
| 15 |
+
["queue"]
|
| 16 |
+
)
|
| 17 |
+
|
| 18 |
+
class MessageBroker:
|
| 19 |
+
def __init__(self, config: BrokerConfig):
|
| 20 |
+
self.config = config
|
| 21 |
+
logger.info("Creating Redis connection pool",
|
| 22 |
+
host=config.redis.host,
|
| 23 |
+
port=config.redis.port,
|
| 24 |
+
ssl=config.redis.ssl)
|
| 25 |
+
|
| 26 |
+
# Base connection parameters
|
| 27 |
+
connection_params = {
|
| 28 |
+
"host": config.redis.host,
|
| 29 |
+
"port": config.redis.port,
|
| 30 |
+
"db": config.redis.db,
|
| 31 |
+
"password": config.redis.password,
|
| 32 |
+
"decode_responses": True,
|
| 33 |
+
"max_connections": config.redis.connection_pool_size
|
| 34 |
+
}
|
| 35 |
+
|
| 36 |
+
# Add SSL configuration if enabled
|
| 37 |
+
if config.redis.ssl:
|
| 38 |
+
connection_params.update({
|
| 39 |
+
"ssl": True,
|
| 40 |
+
"ssl_cert_reqs": None,
|
| 41 |
+
"ssl_ca_certs": None
|
| 42 |
+
})
|
| 43 |
+
|
| 44 |
+
connection_pool = redis.ConnectionPool(**connection_params)
|
| 45 |
+
self._redis = redis.Redis(connection_pool=connection_pool)
|
| 46 |
+
# Start Prometheus metrics server
|
| 47 |
+
start_http_server(config.metrics_port)
|
| 48 |
+
logger.info("Message broker initialized", config=config.dict())
|
| 49 |
+
|
| 50 |
+
def publish(self, queue: str, payload: Dict[str, Any], max_retries: int = None) -> Message:
|
| 51 |
+
"""Publish a message to a queue."""
|
| 52 |
+
message = Message(
|
| 53 |
+
queue=queue,
|
| 54 |
+
payload=payload,
|
| 55 |
+
max_retries=max_retries or self.config.retry.max_retries
|
| 56 |
+
)
|
| 57 |
+
|
| 58 |
+
try:
|
| 59 |
+
self._redis.lpush(f"queue:{queue}", message.to_json())
|
| 60 |
+
messages_published.labels(queue=queue).inc()
|
| 61 |
+
logger.info("Message published", message_id=message.id, queue=queue)
|
| 62 |
+
return message
|
| 63 |
+
except redis.RedisError as e:
|
| 64 |
+
logger.error("Failed to publish message", error=str(e), queue=queue)
|
| 65 |
+
raise
|
| 66 |
+
|
| 67 |
+
def retry_message(self, message: Message) -> None:
|
| 68 |
+
"""Move a message to the retry queue."""
|
| 69 |
+
if message.retry_count >= message.max_retries:
|
| 70 |
+
self._move_to_dead_letter(message)
|
| 71 |
+
return
|
| 72 |
+
|
| 73 |
+
message.retry_count += 1
|
| 74 |
+
delay = min(
|
| 75 |
+
self.config.retry.initial_delay * (self.config.retry.backoff_factor ** (message.retry_count - 1)),
|
| 76 |
+
self.config.retry.max_delay
|
| 77 |
+
)
|
| 78 |
+
message.next_retry_at = datetime.utcnow() + timedelta(seconds=delay)
|
| 79 |
+
|
| 80 |
+
try:
|
| 81 |
+
self._redis.lpush(f"retry:{message.queue}", message.to_json())
|
| 82 |
+
logger.info(
|
| 83 |
+
"Message moved to retry queue",
|
| 84 |
+
message_id=message.id,
|
| 85 |
+
queue=message.queue,
|
| 86 |
+
retry_count=message.retry_count
|
| 87 |
+
)
|
| 88 |
+
except redis.RedisError as e:
|
| 89 |
+
logger.error("Failed to move message to retry queue", error=str(e))
|
| 90 |
+
raise
|
| 91 |
+
|
| 92 |
+
def _move_to_dead_letter(self, message: Message) -> None:
|
| 93 |
+
"""Move a message to the dead letter queue."""
|
| 94 |
+
try:
|
| 95 |
+
self._redis.lpush(f"dead_letter:{message.queue}", message.to_json())
|
| 96 |
+
logger.warning(
|
| 97 |
+
"Message moved to dead letter queue",
|
| 98 |
+
message_id=message.id,
|
| 99 |
+
queue=message.queue,
|
| 100 |
+
retry_count=message.retry_count
|
| 101 |
+
)
|
| 102 |
+
except redis.RedisError as e:
|
| 103 |
+
logger.error("Failed to move message to dead letter queue", error=str(e))
|
| 104 |
+
raise
|
async_framework/config.py
ADDED
|
@@ -0,0 +1,24 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
from typing import Optional
|
| 2 |
+
from pydantic import BaseModel, Field
|
| 3 |
+
|
| 4 |
+
class RedisConfig(BaseModel):
|
| 5 |
+
host: str = Field(default="localhost")
|
| 6 |
+
port: int = Field(default=6379)
|
| 7 |
+
db: int = Field(default=0)
|
| 8 |
+
password: Optional[str] = Field(default=None)
|
| 9 |
+
ssl: bool = Field(default=False)
|
| 10 |
+
connection_pool_size: int = Field(default=10)
|
| 11 |
+
|
| 12 |
+
class RetryConfig(BaseModel):
|
| 13 |
+
max_retries: int = Field(default=3)
|
| 14 |
+
initial_delay: float = Field(default=1.0) # seconds
|
| 15 |
+
max_delay: float = Field(default=60.0) # seconds
|
| 16 |
+
backoff_factor: float = Field(default=2.0)
|
| 17 |
+
|
| 18 |
+
class BrokerConfig(BaseModel):
|
| 19 |
+
redis: RedisConfig = Field(default_factory=RedisConfig)
|
| 20 |
+
retry: RetryConfig = Field(default_factory=RetryConfig)
|
| 21 |
+
num_workers: int = Field(default=1)
|
| 22 |
+
batch_size: int = Field(default=10)
|
| 23 |
+
polling_interval: float = Field(default=1.0) # seconds
|
| 24 |
+
metrics_port: int = Field(default=8000)
|
async_framework/consumer.py
ADDED
|
@@ -0,0 +1,202 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
import redis
|
| 2 |
+
from typing import Callable, List, Optional
|
| 3 |
+
import time
|
| 4 |
+
from concurrent.futures import ThreadPoolExecutor
|
| 5 |
+
import structlog
|
| 6 |
+
from prometheus_client import Counter, Histogram
|
| 7 |
+
|
| 8 |
+
from .config import BrokerConfig
|
| 9 |
+
from .message import Message
|
| 10 |
+
|
| 11 |
+
logger = structlog.get_logger()
|
| 12 |
+
|
| 13 |
+
messages_processed = Counter(
|
| 14 |
+
"messages_processed_total",
|
| 15 |
+
"Total number of messages processed",
|
| 16 |
+
["queue", "status"]
|
| 17 |
+
)
|
| 18 |
+
|
| 19 |
+
processing_time = Histogram(
|
| 20 |
+
"message_processing_seconds",
|
| 21 |
+
"Time spent processing messages",
|
| 22 |
+
["queue"]
|
| 23 |
+
)
|
| 24 |
+
|
| 25 |
+
class MessageConsumer:
|
| 26 |
+
def __init__(self, config: BrokerConfig, queue: str, handler: Callable[[Message], None]):
|
| 27 |
+
self.config = config
|
| 28 |
+
self.queue = queue
|
| 29 |
+
self.handler = handler
|
| 30 |
+
logger.info("Creating Redis connection pool",
|
| 31 |
+
host=config.redis.host,
|
| 32 |
+
port=config.redis.port,
|
| 33 |
+
ssl=config.redis.ssl)
|
| 34 |
+
|
| 35 |
+
connection_params = {
|
| 36 |
+
"host": config.redis.host,
|
| 37 |
+
"port": config.redis.port,
|
| 38 |
+
"db": config.redis.db,
|
| 39 |
+
"password": config.redis.password,
|
| 40 |
+
"decode_responses": True,
|
| 41 |
+
"max_connections": config.redis.connection_pool_size
|
| 42 |
+
}
|
| 43 |
+
|
| 44 |
+
if config.redis.ssl:
|
| 45 |
+
connection_params.update({
|
| 46 |
+
"ssl": True,
|
| 47 |
+
"ssl_cert_reqs": None,
|
| 48 |
+
"ssl_ca_certs": None
|
| 49 |
+
})
|
| 50 |
+
|
| 51 |
+
connection_pool = redis.ConnectionPool(**connection_params)
|
| 52 |
+
self._redis = redis.Redis(connection_pool=connection_pool)
|
| 53 |
+
self._executor = ThreadPoolExecutor(max_workers=config.num_workers)
|
| 54 |
+
self._running = False
|
| 55 |
+
logger.info("Message consumer initialized", queue=queue, config=config.dict())
|
| 56 |
+
|
| 57 |
+
def start(self) -> None:
|
| 58 |
+
"""Start consuming messages."""
|
| 59 |
+
self._running = True
|
| 60 |
+
self._executor.submit(self._process_retry_queue)
|
| 61 |
+
logger.info("self.config.num_workers")
|
| 62 |
+
for _ in range(self.config.num_workers):
|
| 63 |
+
logger.info("-----------------")
|
| 64 |
+
self._executor.submit(self._consume)
|
| 65 |
+
logger.info("Consumer started", queue=self.queue)
|
| 66 |
+
|
| 67 |
+
def stop(self) -> None:
|
| 68 |
+
"""Stop consuming messages."""
|
| 69 |
+
self._running = False
|
| 70 |
+
self._executor.shutdown(wait=True)
|
| 71 |
+
logger.info("Consumer stopped", queue=self.queue)
|
| 72 |
+
|
| 73 |
+
def _consume(self) -> None:
|
| 74 |
+
"""Consume messages from the queue."""
|
| 75 |
+
logger.info("Consumer thread started", queue=self.queue)
|
| 76 |
+
while self._running:
|
| 77 |
+
try:
|
| 78 |
+
messages = self._batch_pop_messages()
|
| 79 |
+
if messages:
|
| 80 |
+
logger.info("Received messages", queue=self.queue, count=len(messages))
|
| 81 |
+
for message_data in messages:
|
| 82 |
+
self._process_message(Message.from_json(message_data))
|
| 83 |
+
else:
|
| 84 |
+
# Small sleep to prevent CPU spinning when queue is empty
|
| 85 |
+
time.sleep(0.1)
|
| 86 |
+
except Exception as e:
|
| 87 |
+
logger.error("Error in consumer loop", error=str(e), queue=self.queue)
|
| 88 |
+
time.sleep(1)
|
| 89 |
+
|
| 90 |
+
def _batch_pop_messages(self) -> List[str]:
|
| 91 |
+
"""Pop a batch of messages from the queue."""
|
| 92 |
+
messages = []
|
| 93 |
+
try:
|
| 94 |
+
# Using brpop instead of rpop for blocking operation
|
| 95 |
+
result = self._redis.brpop([f"queue:{self.queue}"], timeout=1)
|
| 96 |
+
if result:
|
| 97 |
+
messages.append(result[1]) # brpop returns (key, value) tuple
|
| 98 |
+
|
| 99 |
+
# Try to get more messages up to batch size
|
| 100 |
+
for _ in range(self.config.batch_size - 1):
|
| 101 |
+
msg = self._redis.rpop(f"queue:{self.queue}")
|
| 102 |
+
if msg:
|
| 103 |
+
messages.append(msg)
|
| 104 |
+
else:
|
| 105 |
+
break
|
| 106 |
+
|
| 107 |
+
logger.debug("Batch pop result",
|
| 108 |
+
queue=self.queue,
|
| 109 |
+
messages_count=len(messages))
|
| 110 |
+
return messages
|
| 111 |
+
except Exception as e:
|
| 112 |
+
logger.error("Error in batch pop", error=str(e), queue=self.queue)
|
| 113 |
+
return []
|
| 114 |
+
|
| 115 |
+
def _process_message(self, message: Message) -> None:
|
| 116 |
+
"""Process a single message."""
|
| 117 |
+
with processing_time.labels(queue=self.queue).time():
|
| 118 |
+
try:
|
| 119 |
+
self.handler(message)
|
| 120 |
+
messages_processed.labels(
|
| 121 |
+
queue=self.queue, status="success"
|
| 122 |
+
).inc()
|
| 123 |
+
logger.info(
|
| 124 |
+
"Message processed successfully",
|
| 125 |
+
message_id=message.id,
|
| 126 |
+
queue=self.queue
|
| 127 |
+
)
|
| 128 |
+
except Exception as e:
|
| 129 |
+
messages_processed.labels(
|
| 130 |
+
queue=self.queue, status="error"
|
| 131 |
+
).inc()
|
| 132 |
+
message.error = str(e)
|
| 133 |
+
self._handle_processing_error(message)
|
| 134 |
+
|
| 135 |
+
def _handle_processing_error(self, message: Message) -> None:
|
| 136 |
+
"""Handle a message processing error."""
|
| 137 |
+
if message.retry_count < message.max_retries:
|
| 138 |
+
self._retry_message(message)
|
| 139 |
+
else:
|
| 140 |
+
self._move_to_dead_letter(message)
|
| 141 |
+
|
| 142 |
+
def _retry_message(self, message: Message) -> None:
|
| 143 |
+
"""Move a message to the retry queue with exponential backoff."""
|
| 144 |
+
message.retry_count += 1
|
| 145 |
+
delay = min(
|
| 146 |
+
self.config.retry.initial_delay *
|
| 147 |
+
(self.config.retry.backoff_factor ** (message.retry_count - 1)),
|
| 148 |
+
self.config.retry.max_delay
|
| 149 |
+
)
|
| 150 |
+
self._redis.zadd(
|
| 151 |
+
f"retry:{self.queue}",
|
| 152 |
+
{message.to_json(): time.time() + delay}
|
| 153 |
+
)
|
| 154 |
+
logger.info(
|
| 155 |
+
"Message scheduled for retry",
|
| 156 |
+
message_id=message.id,
|
| 157 |
+
queue=self.queue,
|
| 158 |
+
retry_count=message.retry_count,
|
| 159 |
+
delay=delay
|
| 160 |
+
)
|
| 161 |
+
|
| 162 |
+
def _process_retry_queue(self) -> None:
|
| 163 |
+
"""Process messages in the retry queue."""
|
| 164 |
+
while self._running:
|
| 165 |
+
try:
|
| 166 |
+
# Get messages that are ready to be retried
|
| 167 |
+
messages = self._redis.zrangebyscore(
|
| 168 |
+
f"retry:{self.queue}",
|
| 169 |
+
"-inf",
|
| 170 |
+
time.time(),
|
| 171 |
+
start=0,
|
| 172 |
+
num=self.config.batch_size
|
| 173 |
+
)
|
| 174 |
+
|
| 175 |
+
if not messages:
|
| 176 |
+
time.sleep(self.config.polling_interval)
|
| 177 |
+
continue
|
| 178 |
+
|
| 179 |
+
# Remove the processed messages from the retry queue
|
| 180 |
+
pipeline = self._redis.pipeline()
|
| 181 |
+
for message_data in messages:
|
| 182 |
+
#message = Message.from_json(message_data)
|
| 183 |
+
pipeline.zrem(f"retry:{self.queue}", message_data)
|
| 184 |
+
pipeline.lpush(f"queue:{self.queue}", message_data)
|
| 185 |
+
pipeline.execute()
|
| 186 |
+
|
| 187 |
+
except Exception as e:
|
| 188 |
+
logger.error("Error processing retry queue", error=str(e))
|
| 189 |
+
time.sleep(1)
|
| 190 |
+
|
| 191 |
+
def _move_to_dead_letter(self, message: Message) -> None:
|
| 192 |
+
"""Move a message to the dead letter queue."""
|
| 193 |
+
try:
|
| 194 |
+
self._redis.lpush(f"dead_letter:{self.queue}", message.to_json())
|
| 195 |
+
logger.warning(
|
| 196 |
+
"Message moved to dead letter queue",
|
| 197 |
+
message_id=message.id,
|
| 198 |
+
queue=self.queue,
|
| 199 |
+
error=message.error
|
| 200 |
+
)
|
| 201 |
+
except redis.RedisError as e:
|
| 202 |
+
logger.error("Failed to move message to dead letter queue", error=str(e))
|
async_framework/message.py
ADDED
|
@@ -0,0 +1,35 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
from datetime import datetime
|
| 2 |
+
from typing import Any, Dict, Optional
|
| 3 |
+
from pydantic import BaseModel, Field
|
| 4 |
+
import json
|
| 5 |
+
import uuid
|
| 6 |
+
|
| 7 |
+
class Message(BaseModel):
|
| 8 |
+
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
|
| 9 |
+
queue: str
|
| 10 |
+
payload: Dict[str, Any]
|
| 11 |
+
created_at: datetime = Field(default_factory=datetime.utcnow)
|
| 12 |
+
retry_count: int = Field(default=0)
|
| 13 |
+
max_retries: int = Field(default=3)
|
| 14 |
+
next_retry_at: Optional[datetime] = Field(default=None)
|
| 15 |
+
error: Optional[str] = Field(default=None)
|
| 16 |
+
|
| 17 |
+
def to_json(self) -> str:
|
| 18 |
+
return json.dumps({
|
| 19 |
+
"id": self.id,
|
| 20 |
+
"queue": self.queue,
|
| 21 |
+
"payload": self.payload,
|
| 22 |
+
"created_at": self.created_at.isoformat(),
|
| 23 |
+
"retry_count": self.retry_count,
|
| 24 |
+
"max_retries": self.max_retries,
|
| 25 |
+
"next_retry_at": self.next_retry_at.isoformat() if self.next_retry_at else None,
|
| 26 |
+
"error": self.error
|
| 27 |
+
})
|
| 28 |
+
|
| 29 |
+
@classmethod
|
| 30 |
+
def from_json(cls, data: str) -> "Message":
|
| 31 |
+
json_data = json.loads(data)
|
| 32 |
+
json_data["created_at"] = datetime.fromisoformat(json_data["created_at"])
|
| 33 |
+
if json_data["next_retry_at"]:
|
| 34 |
+
json_data["next_retry_at"] = datetime.fromisoformat(json_data["next_retry_at"])
|
| 35 |
+
return cls(**json_data)
|
pyproject.toml
ADDED
|
@@ -0,0 +1,20 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
[build-system]
|
| 2 |
+
requires = ["setuptools>=45", "wheel>=0.36.0", "build>=0.7.0"]
|
| 3 |
+
build-backend = "setuptools.build_meta"
|
| 4 |
+
|
| 5 |
+
[project]
|
| 6 |
+
name = "async-framework"
|
| 7 |
+
version = "0.1.0"
|
| 8 |
+
description = "A Redis-based message queue framework with monitoring and retry capabilities"
|
| 9 |
+
readme = "README.md"
|
| 10 |
+
requires-python = ">=3.10"
|
| 11 |
+
license = {text = "MIT"}
|
| 12 |
+
classifiers = [
|
| 13 |
+
"Development Status :: 3 - Alpha",
|
| 14 |
+
"Intended Audience :: Developers",
|
| 15 |
+
"Programming Language :: Python :: 3.10",
|
| 16 |
+
"Programming Language :: Python :: 3.11",
|
| 17 |
+
]
|
| 18 |
+
|
| 19 |
+
[tool.setuptools]
|
| 20 |
+
packages = ["async_framework"]
|
setup.py
ADDED
|
@@ -0,0 +1,27 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
from setuptools import setup, find_packages
|
| 2 |
+
|
| 3 |
+
setup(
|
| 4 |
+
name="async-framework",
|
| 5 |
+
version="0.1.0",
|
| 6 |
+
packages=find_packages(exclude=["tests*"]),
|
| 7 |
+
install_requires=[
|
| 8 |
+
"redis>=4.5.0",
|
| 9 |
+
"pydantic>=2.0.0",
|
| 10 |
+
"prometheus-client>=0.17.0",
|
| 11 |
+
"tenacity>=8.2.0",
|
| 12 |
+
"structlog>=23.1.0",
|
| 13 |
+
],
|
| 14 |
+
python_requires=">=3.10",
|
| 15 |
+
author="Your Name",
|
| 16 |
+
author_email="your.email@example.com",
|
| 17 |
+
description="A Redis-based message queue framework with monitoring and retry capabilities",
|
| 18 |
+
long_description=open("README.md").read(),
|
| 19 |
+
long_description_content_type="text/markdown",
|
| 20 |
+
keywords="redis, queue, broker, consumer, async",
|
| 21 |
+
classifiers=[
|
| 22 |
+
"Development Status :: 3 - Alpha",
|
| 23 |
+
"Intended Audience :: Developers",
|
| 24 |
+
"Programming Language :: Python :: 3.10",
|
| 25 |
+
"Programming Language :: Python :: 3.11",
|
| 26 |
+
],
|
| 27 |
+
)
|