Prabha-AIMLOPS commited on
Commit
2452e46
·
verified ·
1 Parent(s): ef03531

initial files

Browse files
Files changed (4) hide show
  1. README.md +74 -11
  2. consumers.py +83 -0
  3. message_generators.py +40 -0
  4. publishers.py +77 -0
README.md CHANGED
@@ -1,11 +1,74 @@
1
- ---
2
- title: Async Framework Test
3
- emoji: 📈
4
- colorFrom: gray
5
- colorTo: purple
6
- sdk: docker
7
- pinned: false
8
- short_description: to test the pub/sub
9
- ---
10
-
11
- Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Redis Message Queue Test Project
2
+
3
+ This project demonstrates the usage of the async-framework library with multiple queues, publishers, and consumers.
4
+
5
+ ## Project Structure
6
+ - `config.py` - Configuration for Redis and broker settings
7
+ - `message_generators.py` - Message generators for different queue types
8
+ - `publishers.py` - Publishers for different queues
9
+ - `consumers.py` - Consumers with message handlers
10
+
11
+ ## Setup
12
+
13
+ 1. Install Redis:
14
+ - Windows: Download from [Redis Windows](https://github.com/microsoftarchive/redis/releases)
15
+ - Linux: `sudo apt-get install redis-server`
16
+ - MacOS: `brew install redis`
17
+
18
+ 2. Create and activate virtual environment:
19
+ ```bash
20
+ python -m venv venv
21
+ .\venv\Scripts\activate # Windows
22
+ source venv/bin/activate # Linux/MacOS
23
+ ```
24
+
25
+ 3. Install dependencies:
26
+ ```bash
27
+ pip install async-framework structlog
28
+ ```
29
+
30
+ 4. Start Redis server:
31
+ ```bash
32
+ redis-server
33
+ ```
34
+
35
+ ## Running the Test
36
+
37
+ 1. Start the consumers:
38
+ ```bash
39
+ python consumers.py
40
+ ```
41
+
42
+ 2. In a new terminal, start the publishers:
43
+ ```bash
44
+ python publishers.py
45
+ ```
46
+
47
+ ## Message Types
48
+
49
+ 1. Orders Queue:
50
+ - Order details with ID, customer, amount
51
+ - Simulated processing time: 0.5s
52
+ - Fails for orders > $900
53
+
54
+ 2. Notifications Queue:
55
+ - Notification details with type and priority
56
+ - Simulated processing time: 0.3s
57
+ - High priority notifications fail twice before success
58
+
59
+ 3. Analytics Queue:
60
+ - Event tracking data
61
+ - Simulated processing time: 0.1s
62
+ - No simulated failures
63
+
64
+ ## Monitoring
65
+
66
+ Access Prometheus metrics at:
67
+ ```
68
+ http://localhost:8000
69
+ ```
70
+
71
+ Available metrics:
72
+ - messages_published_total
73
+ - messages_processed_total
74
+ - message_processing_seconds
consumers.py ADDED
@@ -0,0 +1,83 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from async_framework import MessageConsumer, Message
2
+ import time
3
+ import structlog
4
+ from config import BROKER_CONFIG
5
+
6
+ logger = structlog.get_logger()
7
+
8
+ def process_order(message: Message):
9
+ logger.info(
10
+ "Processing order",
11
+ message_id=message.id,
12
+ order_id=message.payload.get("order_id"),
13
+ amount=message.payload.get("amount")
14
+ )
15
+ # Simulate processing time
16
+ time.sleep(0.5)
17
+ # Simulate occasional failures
18
+ if message.payload.get("amount", 0) > 900:
19
+ raise ValueError("Order amount too high")
20
+
21
+ def process_notification(message: Message):
22
+ logger.info(
23
+ "Processing notification",
24
+ message_id=message.id,
25
+ notification_type=message.payload.get("type"),
26
+ priority=message.payload.get("priority")
27
+ )
28
+ # Simulate processing time
29
+ time.sleep(0.3)
30
+ # Simulate occasional failures for high priority notifications
31
+ if message.payload.get("priority") == "high":
32
+ if message.retry_count < 2: # Will succeed on third try
33
+ raise ValueError("Failed to send high priority notification")
34
+
35
+ def process_analytics(message: Message):
36
+ logger.info(
37
+ "Processing analytics event",
38
+ message_id=message.id,
39
+ event_type=message.payload.get("event_type"),
40
+ user_id=message.payload.get("user_id")
41
+ )
42
+ # Simulate processing time
43
+ time.sleep(0.1)
44
+
45
+ def main():
46
+ # Create consumers for different queues
47
+ consumers = [
48
+ MessageConsumer(
49
+ config=BROKER_CONFIG,
50
+ queue="orders",
51
+ handler=process_order
52
+ ),
53
+ MessageConsumer(
54
+ config=BROKER_CONFIG,
55
+ queue="notifications",
56
+ handler=process_notification
57
+ ),
58
+ MessageConsumer(
59
+ config=BROKER_CONFIG,
60
+ queue="analytics",
61
+ handler=process_analytics
62
+ )
63
+ ]
64
+
65
+ try:
66
+ # Start all consumers
67
+ for consumer in consumers:
68
+ consumer.start()
69
+ logger.info(
70
+ "Started consumer",
71
+ queue=consumer.queue
72
+ )
73
+
74
+ # Keep the main thread running
75
+ while True:
76
+ time.sleep(1)
77
+ except KeyboardInterrupt:
78
+ logger.info("Shutting down consumers")
79
+ for consumer in consumers:
80
+ consumer.stop()
81
+
82
+ if __name__ == "__main__":
83
+ main()
message_generators.py ADDED
@@ -0,0 +1,40 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from abc import ABC, abstractmethod
2
+ from typing import Dict, Any
3
+ import random
4
+ import time
5
+
6
+ class MessageGenerator(ABC):
7
+ @abstractmethod
8
+ def generate_message(self) -> Dict[str, Any]:
9
+ pass
10
+
11
+ class OrderMessageGenerator(MessageGenerator):
12
+ def generate_message(self) -> Dict[str, Any]:
13
+ order_id = f"ORD-{random.randint(1000, 9999)}"
14
+ return {
15
+ "order_id": order_id,
16
+ "customer_id": f"CUST-{random.randint(100, 999)}",
17
+ "amount": round(random.uniform(10.0, 1000.0), 2),
18
+ "items": random.randint(1, 10),
19
+ "timestamp": time.time()
20
+ }
21
+
22
+ class NotificationMessageGenerator(MessageGenerator):
23
+ def generate_message(self) -> Dict[str, Any]:
24
+ return {
25
+ "notification_id": f"NOTIF-{random.randint(1000, 9999)}",
26
+ "user_id": f"USER-{random.randint(100, 999)}",
27
+ "type": random.choice(["email", "sms", "push"]),
28
+ "priority": random.choice(["high", "medium", "low"]),
29
+ "timestamp": time.time()
30
+ }
31
+
32
+ class AnalyticsMessageGenerator(MessageGenerator):
33
+ def generate_message(self) -> Dict[str, Any]:
34
+ return {
35
+ "event_id": f"EVT-{random.randint(1000, 9999)}",
36
+ "event_type": random.choice(["page_view", "click", "purchase", "login"]),
37
+ "user_id": f"USER-{random.randint(100, 999)}",
38
+ "session_id": f"SESSION-{random.randint(1000, 9999)}",
39
+ "timestamp": time.time()
40
+ }
publishers.py ADDED
@@ -0,0 +1,77 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from async_framework import MessageBroker, Message
2
+ import time
3
+ import random
4
+ from typing import Optional
5
+ import structlog
6
+ from config import BROKER_CONFIG
7
+ from message_generators import (
8
+ OrderMessageGenerator,
9
+ NotificationMessageGenerator,
10
+ AnalyticsMessageGenerator
11
+ )
12
+
13
+ logger = structlog.get_logger()
14
+
15
+ class Publisher:
16
+ def __init__(self, queue_name: str, message_generator, publish_interval: float = 1.0):
17
+ self.broker = MessageBroker(BROKER_CONFIG)
18
+ self.queue_name = queue_name
19
+ self.message_generator = message_generator
20
+ self.publish_interval = publish_interval
21
+ logger.info("Publisher initialized", queue=queue_name)
22
+
23
+ def run(self, num_messages: Optional[int] = None):
24
+ count = 0
25
+ try:
26
+ while num_messages is None or count < num_messages:
27
+ message_data = self.message_generator.generate_message()
28
+ message = self.broker.publish(
29
+ queue=self.queue_name,
30
+ payload=message_data
31
+ )
32
+ logger.info(
33
+ "Message published",
34
+ queue=self.queue_name,
35
+ message_id=message.id
36
+ )
37
+ count += 1
38
+ # Random delay between 0.5x and 1.5x of publish_interval
39
+ time.sleep(self.publish_interval * random.uniform(0.5, 1.5))
40
+ except KeyboardInterrupt:
41
+ logger.info("Publisher stopped", queue=self.queue_name)
42
+ except Exception as e:
43
+ logger.error("Error in publisher", queue=self.queue_name, error=str(e))
44
+
45
+ def main():
46
+ # Create publishers for different queues
47
+ publishers = [
48
+ Publisher("orders", OrderMessageGenerator(), 2.0),
49
+ Publisher("notifications", NotificationMessageGenerator(), 1.0),
50
+ Publisher("analytics", AnalyticsMessageGenerator(), 0.5)
51
+ ]
52
+
53
+ try:
54
+ # Run publishers in parallel using threads
55
+ import threading
56
+ threads = []
57
+ for publisher in publishers:
58
+ thread = threading.Thread(
59
+ target=publisher.run,
60
+ name=f"publisher-{publisher.queue_name}"
61
+ )
62
+ thread.daemon = True
63
+ thread.start()
64
+ threads.append(thread)
65
+ logger.info(
66
+ "Started publisher thread",
67
+ queue=publisher.queue_name
68
+ )
69
+
70
+ # Keep the main thread running
71
+ while True:
72
+ time.sleep(1)
73
+ except KeyboardInterrupt:
74
+ logger.info("Shutting down publishers")
75
+
76
+ if __name__ == "__main__":
77
+ main()