Spaces:
Configuration error
Configuration error
| # Async Framework | |
| A Python library for Redis-based message queue processing with monitoring and retry capabilities. This framework provides a robust solution for distributed message processing with features like automatic retries, dead letter queues, and Prometheus metrics. | |
| ## Features | |
| - Redis-based message broker and consumer | |
| - Configurable number of workers | |
| - JSON message format with standardized structure | |
| - Efficient Redis connection pooling | |
| - Prometheus metrics for monitoring | |
| - Automatic and manual retry mechanisms | |
| - Dead letter queue for failed messages | |
| - Batch processing support | |
| - Distributed environment ready | |
| ## Requirements | |
| - Python 3.10+ | |
| - Redis server | |
| - Dependencies (automatically installed): | |
| - redis>=4.5.0 | |
| - pydantic>=2.0.0 | |
| - prometheus-client>=0.17.0 | |
| - tenacity>=8.2.0 | |
| - structlog>=23.1.0 | |
| ## Installation | |
| ### Installing from Source | |
| 1. Clone the repository: | |
| ```bash | |
| git clone https://github.com/yourusername/async-framework.git | |
| cd async-framework | |
| ``` | |
| 2. Install the package: | |
| ```bash | |
| pip install . | |
| ``` | |
| ### Building the Package | |
| To build the package as a wheel file: | |
| 1. Install build requirements: | |
| ```bash | |
| pip install build wheel | |
| ``` | |
| 2. Build the package: | |
| ```bash | |
| python -m build | |
| ``` | |
| This will create two files in the `dist` directory: | |
| - `async_framework-0.1.0-py3-none-any.whl` (Wheel file) | |
| - `async_framework-0.1.0.tar.gz` (Source distribution) | |
| ### Installing the Built Package | |
| You can install the wheel file in other projects using: | |
| ```bash | |
| pip install async_framework-0.1.0-py3-none-any.whl | |
| ``` | |
| Or install directly from your private PyPI repository: | |
| ```bash | |
| pip install async-framework | |
| ``` | |
| ## Usage | |
| ### Configuration | |
| ```python | |
| from async_framework import BrokerConfig, RedisConfig, RetryConfig | |
| config = BrokerConfig( | |
| redis=RedisConfig( | |
| host="localhost", | |
| port=6379, | |
| password="optional_password", | |
| connection_pool_size=10 | |
| ), | |
| retry=RetryConfig( | |
| max_retries=3, | |
| initial_delay=1.0, | |
| max_delay=60.0, | |
| backoff_factor=2.0 | |
| ), | |
| num_workers=4, | |
| batch_size=10, | |
| polling_interval=1.0, | |
| metrics_port=8000 | |
| ) | |
| ``` | |
| ### Publishing Messages | |
| ```python | |
| from async_framework import MessageBroker | |
| # Initialize the broker | |
| broker = MessageBroker(config) | |
| # Publish a message | |
| message = broker.publish( | |
| queue="my_queue", | |
| payload={"key": "value"}, | |
| max_retries=3 # Optional, defaults to config value | |
| ) | |
| ``` | |
| ### Consuming Messages | |
| ```python | |
| from async_framework import MessageConsumer, Message | |
| # Define a message handler | |
| def process_message(message: Message): | |
| print(f"Processing message {message.id}: {message.payload}") | |
| # Your processing logic here | |
| # Raise an exception to trigger retry mechanism | |
| # Initialize the consumer | |
| consumer = MessageConsumer( | |
| config=config, | |
| queue="my_queue", | |
| handler=process_message | |
| ) | |
| # Start consuming messages | |
| consumer.start() | |
| try: | |
| # Keep the main thread running | |
| while True: | |
| time.sleep(1) | |
| except KeyboardInterrupt: | |
| consumer.stop() | |
| ``` | |
| ### Monitoring | |
| The framework exposes Prometheus metrics at `http://localhost:8000` (configurable via `metrics_port`): | |
| - `messages_published_total` - Counter of published messages by queue | |
| - `messages_processed_total` - Counter of processed messages by queue and status | |
| - `message_processing_seconds` - Histogram of message processing time by queue | |
| ### Queue Structure | |
| The framework uses the following Redis key patterns: | |
| - `queue:{queue_name}` - Main message queue | |
| - `retry:{queue_name}` - Retry queue with scheduled messages | |
| - `dead_letter:{queue_name}` - Dead letter queue for failed messages | |
| ### Message Format | |
| Messages are stored in JSON format with the following structure: | |
| ```json | |
| { | |
| "id": "uuid", | |
| "queue": "queue_name", | |
| "payload": { | |
| // Your message data | |
| }, | |
| "created_at": "2023-08-26T12:00:00Z", | |
| "retry_count": 0, | |
| "max_retries": 3, | |
| "next_retry_at": null, | |
| "error": null | |
| } | |
| ``` | |
| ## Development | |
| 1. Create a virtual environment: | |
| ```bash | |
| python -m venv venv | |
| source venv/bin/activate # Linux/MacOS | |
| # or | |
| .\\venv\\Scripts\\activate # Windows | |
| ``` | |
| 2. Install development dependencies: | |
| ```bash | |
| pip install -e ".[dev]" | |
| ``` | |
| 3. Run tests: | |
| ```bash | |
| pytest | |
| ``` | |
| ## License | |
| MIT License. See LICENSE file for details. | |