update to ack
Browse files
worker.py
CHANGED
|
@@ -83,7 +83,6 @@ class RabbitMQWorker:
|
|
| 83 |
logger.info("[Worker %s] Received message: %s, headers: %s", thread_id, body, headers)
|
| 84 |
|
| 85 |
try:
|
| 86 |
-
ch.basic_ack(delivery_tag=method.delivery_tag)
|
| 87 |
contexts = []
|
| 88 |
|
| 89 |
body_dict = json.loads(body)
|
|
@@ -116,6 +115,7 @@ class RabbitMQWorker:
|
|
| 116 |
# Publish results
|
| 117 |
if self.publish_message(body_dict, headers):
|
| 118 |
logger.info("[Worker %s] Successfully published results to ml_server.", thread_id)
|
|
|
|
| 119 |
else:
|
| 120 |
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
| 121 |
logger.error("[Worker %s] Failed to publish results.", thread_id)
|
|
@@ -123,7 +123,7 @@ class RabbitMQWorker:
|
|
| 123 |
logger.info("[Worker %s] Contexts: %s", thread_id, contexts)
|
| 124 |
|
| 125 |
else:
|
| 126 |
-
ch.basic_ack(delivery_tag=method.delivery_tag)
|
| 127 |
logger.warning("[Worker %s] Unknown pattern type in headers: %s", thread_id, pattern)
|
| 128 |
|
| 129 |
except Exception as e:
|
|
|
|
| 83 |
logger.info("[Worker %s] Received message: %s, headers: %s", thread_id, body, headers)
|
| 84 |
|
| 85 |
try:
|
|
|
|
| 86 |
contexts = []
|
| 87 |
|
| 88 |
body_dict = json.loads(body)
|
|
|
|
| 115 |
# Publish results
|
| 116 |
if self.publish_message(body_dict, headers):
|
| 117 |
logger.info("[Worker %s] Successfully published results to ml_server.", thread_id)
|
| 118 |
+
ch.basic_ack(delivery_tag=method.delivery_tag)
|
| 119 |
else:
|
| 120 |
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
| 121 |
logger.error("[Worker %s] Failed to publish results.", thread_id)
|
|
|
|
| 123 |
logger.info("[Worker %s] Contexts: %s", thread_id, contexts)
|
| 124 |
|
| 125 |
else:
|
| 126 |
+
ch.basic_ack(delivery_tag=method.delivery_tag, requeue=False)
|
| 127 |
logger.warning("[Worker %s] Unknown pattern type in headers: %s", thread_id, pattern)
|
| 128 |
|
| 129 |
except Exception as e:
|