ElmiraManavi commited on
Commit
4e9265f
·
1 Parent(s): 8cf2949

make service publish-only | create Testing page | refactoring

Browse files
requirements.txt CHANGED
Binary files a/requirements.txt and b/requirements.txt differ
 
src/pages/Eventportal.py CHANGED
@@ -1,12 +1,10 @@
1
- import os
2
  from datetime import datetime
 
3
 
4
  from bson import ObjectId
5
  from streamlit import streamlit as st
6
 
7
- from dotenv import load_dotenv
8
- from pymongo import MongoClient
9
- from itertools import batched
10
 
11
  st.set_page_config(layout="wide")
12
 
@@ -26,23 +24,16 @@ st.markdown(
26
  review_options = [":green[:material/done:]", ":red[:material/highlight_off:]"]
27
  review_filter_options = ["Abgeschlossene Reviews", "Offene Reviews"]
28
  review_fields = [
29
- ("Titel", "title_valid"),
30
- ("Kategorie", "category_valid"),
31
- ("Subkategorie", "format_valid"),
32
- ("Startdatum", "startdate_valid"),
33
- ("Startzeit", "starttime_valid"),
34
- ("Enddatum", "enddate_valid"),
35
- ("Endzeit", "endtime_valid"),
36
- ("Adresse", "address_valid"),
37
- ("Link", "url_valid")
38
- ]
39
-
40
- @st.cache_resource
41
- def init_connection():
42
- load_dotenv()
43
- uri = f"mongodb+srv://{os.getenv('MONGO_USERNAME')}:{os.getenv('MONGO_PASSWORD')}@{os.getenv('MONGO_HOST')}/?retryWrites=true&w=majority&appName=Cluster0"
44
- client = MongoClient(uri)
45
- return client.event_data
46
 
47
 
48
  @st.cache_resource
@@ -183,9 +174,9 @@ def display_event(event_data):
183
 
184
  if comments:
185
  st.write("**Kommentare**")
186
- for comment_id,comment in enumerate(comments):
187
  st.write(f"💬 {comment}")
188
- st.chat_input("Neuer Kommentar" , key="comment_input" + str(event_data["_id"]),
189
  on_submit=update_review_comment,
190
  args=(str(event_data["_id"]),))
191
 
@@ -193,8 +184,6 @@ def display_event(event_data):
193
  def render_review_controls(event_data):
194
  """Erzeugt dynamisch alle Review-Segmented-Controls für ein Event."""
195
 
196
-
197
-
198
  # Dynamisch Spalten erzeugen
199
  cols = st.columns(len(review_fields))
200
 
@@ -335,13 +324,13 @@ batches = st.session_state.batched_events
335
 
336
  # Page Content
337
 
338
- col_rev_toggle, col_control1 = st.columns([1,4])
339
  with col_rev_toggle:
340
  st.toggle("Review Mode", value=False, on_change=toggle_review_mode)
341
  if st.session_state.review_mode:
342
  with col_control1:
343
- st.pills("Filter Reviews", review_filter_options, key="filter_reviews_input", label_visibility="collapsed", on_change=update_filter)
344
-
345
 
346
  st.title("Eventportal")
347
 
 
 
1
  from datetime import datetime
2
+ from itertools import batched
3
 
4
  from bson import ObjectId
5
  from streamlit import streamlit as st
6
 
7
+ from services.db_service import init_connection
 
 
8
 
9
  st.set_page_config(layout="wide")
10
 
 
24
  review_options = [":green[:material/done:]", ":red[:material/highlight_off:]"]
25
  review_filter_options = ["Abgeschlossene Reviews", "Offene Reviews"]
26
  review_fields = [
27
+ ("Titel", "title_valid"),
28
+ ("Kategorie", "category_valid"),
29
+ ("Subkategorie", "format_valid"),
30
+ ("Startdatum", "startdate_valid"),
31
+ ("Startzeit", "starttime_valid"),
32
+ ("Enddatum", "enddate_valid"),
33
+ ("Endzeit", "endtime_valid"),
34
+ ("Adresse", "address_valid"),
35
+ ("Link", "url_valid")
36
+ ]
 
 
 
 
 
 
 
37
 
38
 
39
  @st.cache_resource
 
174
 
175
  if comments:
176
  st.write("**Kommentare**")
177
+ for comment_id, comment in enumerate(comments):
178
  st.write(f"💬 {comment}")
179
+ st.chat_input("Neuer Kommentar", key="comment_input" + str(event_data["_id"]),
180
  on_submit=update_review_comment,
181
  args=(str(event_data["_id"]),))
182
 
 
184
  def render_review_controls(event_data):
185
  """Erzeugt dynamisch alle Review-Segmented-Controls für ein Event."""
186
 
 
 
187
  # Dynamisch Spalten erzeugen
188
  cols = st.columns(len(review_fields))
189
 
 
324
 
325
  # Page Content
326
 
327
+ col_rev_toggle, col_control1 = st.columns([1, 4])
328
  with col_rev_toggle:
329
  st.toggle("Review Mode", value=False, on_change=toggle_review_mode)
330
  if st.session_state.review_mode:
331
  with col_control1:
332
+ st.pills("Filter Reviews", review_filter_options, key="filter_reviews_input", label_visibility="collapsed",
333
+ on_change=update_filter)
334
 
335
  st.title("Eventportal")
336
 
src/pages/Pipeline_Demo.py CHANGED
@@ -1,43 +1,15 @@
1
- import os
2
- import logging
3
- import threading
4
- from typing import Callable
5
-
6
  import streamlit as st
7
  from dotenv import load_dotenv
8
 
9
- from project import Config, DemoThread
10
- from project.adapters import InAdapter, OutAdapter, MessageQueueOutAdapterImpl
11
-
12
- logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
13
- logger = logging.getLogger(__name__)
14
 
15
  load_dotenv()
16
 
17
-
18
- @st.cache_resource
19
- def create_project_thread(_config: Config) -> DemoThread:
20
- in_adapter_injector: Callable[[Config], InAdapter] = lambda c: InAdapter()
21
-
22
- if config.mq_user == "local":
23
- logger.info("Running with local OutAdapter (no MQ connection).")
24
- out_adapter_injector: Callable[[Config], OutAdapter] = lambda c: OutAdapter()
25
- else:
26
- logger.info(f"Using real MessageQueueOutAdapter -> {config.queue_dest}")
27
- out_adapter_injector: Callable[[Config], OutAdapter] = lambda c: MessageQueueOutAdapterImpl(c, c.queue_dest)
28
-
29
- return DemoThread(config, None, in_adapter_injector, out_adapter_injector)
30
-
31
-
32
  config = Config()
33
  config.display()
34
 
35
- publish_thread = create_project_thread(config)
36
-
37
- if config.is_start_publish():
38
- logger.info("Starting MQ consume thread...")
39
- consume_thread = threading.Thread(target=publish_thread.run, daemon=True)
40
- consume_thread.start()
41
 
42
  st.title("Pipeline Demo")
43
  st.markdown("Sende eine Url an die Message Queue um die Pipeline zu starten:")
@@ -47,11 +19,9 @@ user_input = st.text_input("URL eingeben:")
47
  if st.button("Senden"):
48
  if user_input:
49
  try:
50
-
51
- publish_thread._DemoThread__queue_dest.send_message(user_input.strip())
52
  st.info("URL wurde gesendet")
53
- logger.info(f"Message sent to MQ: {user_input}")
54
  except Exception as e:
55
- logger.exception("Fehler beim MQ-Senden")
56
  else:
57
  st.warning("Bitte eine Nachricht eingeben, bevor du sendest.")
 
 
 
 
 
 
1
  import streamlit as st
2
  from dotenv import load_dotenv
3
 
4
+ from project import Config
5
+ from services.mq_service import create_out_adapter
 
 
 
6
 
7
  load_dotenv()
8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
  config = Config()
10
  config.display()
11
 
12
+ out_adapter = create_out_adapter(config, config.queue_dest_pipeline_trigger)
 
 
 
 
 
13
 
14
  st.title("Pipeline Demo")
15
  st.markdown("Sende eine Url an die Message Queue um die Pipeline zu starten:")
 
19
  if st.button("Senden"):
20
  if user_input:
21
  try:
22
+ out_adapter.send_message(user_input.strip())
 
23
  st.info("URL wurde gesendet")
 
24
  except Exception as e:
25
+ st.error(f"Fehler beim senden in die Message Queue: {e}")
26
  else:
27
  st.warning("Bitte eine Nachricht eingeben, bevor du sendest.")
src/pages/Testing.py ADDED
@@ -0,0 +1,63 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import base64
2
+ import json
3
+
4
+ import streamlit as st
5
+ from dotenv import load_dotenv
6
+
7
+ from project import Config
8
+ from services.db_service import init_connection
9
+ from services.mq_service import create_out_adapter
10
+
11
+ load_dotenv()
12
+ config = Config()
13
+ config.display()
14
+ out_adapter = create_out_adapter(config, config.queue_dest_test_trigger)
15
+
16
+ db = init_connection()
17
+
18
+ st.title("Testing")
19
+ st.write(f"Es sind {db.testdata_1.count_documents({})} Testdatensätze vorhanden.")
20
+
21
+ with st.form("Testkonfiguration"):
22
+ batch_id = st.selectbox("Testbatch wählen", [0, 1, 2, 3])
23
+
24
+ start_test = st.form_submit_button("Starte Test")
25
+
26
+ if start_test:
27
+ testbatch = list(db.testdata_1.find({"batch_id": batch_id}))
28
+ batch_size = len(testbatch)
29
+ doc = db.test_evaluation.find_one({}, sort=[("test_number", -1)])
30
+ test_number = doc.get("test_number") + 1 if doc else 0
31
+
32
+ st.write(f"Starte Test mit {len(testbatch)} Testdaten, Test-ID: {test_number}")
33
+ insert_result = db.test_evaluation.insert_one(
34
+ {"test_number": test_number, "batch_size": batch_size, "batch_id": batch_id, "status": "Pending"})
35
+ test_id = insert_result.inserted_id if insert_result else None
36
+ if test_id:
37
+ for testdata in testbatch:
38
+ id = str(testdata["_id"])
39
+ url = testdata.get("url")
40
+ html = testdata.get("html").decode("utf-8")
41
+ data = testdata.get("data")
42
+ st.write(id)
43
+ st.write(url)
44
+ st.write("___")
45
+
46
+ try:
47
+ message = {
48
+ "url": url,
49
+ "body": base64.b64encode(html.encode("utf-8")).decode("utf-8"),
50
+ "meta": [{
51
+ "service_name": "demo-service",
52
+ "queue_dest": "storehtml-out",
53
+ "test_data": {"test_id": str(test_id), "test_number": test_number, "batch_size": batch_size,
54
+ "batch_id": batch_id, "data_id": id},
55
+ }]
56
+ }
57
+ encoded = json.dumps(message).encode("utf-8")
58
+ out_adapter.send_message(encoded)
59
+
60
+ except Exception as e:
61
+ st.error(f"Fehler beim senden in die Message Queue: {e}")
62
+ else:
63
+ st.error("Fehler beim anlegen des Tests")
src/project/__init__.py CHANGED
@@ -1,2 +1 @@
1
  from .config import Config
2
- from .demo_thread import DemoThread
 
1
  from .config import Config
 
src/project/adapters/__init__.py CHANGED
@@ -1,4 +1,2 @@
1
- from .in_adapter import InAdapter
2
- from .message_queue_in_adapter_impl import MessageQueueInAdapterImpl
3
  from .message_queue_out_adapter_impl import MessageQueueOutAdapterImpl
4
  from .out_adapter import OutAdapter
 
 
 
1
  from .message_queue_out_adapter_impl import MessageQueueOutAdapterImpl
2
  from .out_adapter import OutAdapter
src/project/adapters/in_adapter.py DELETED
@@ -1,12 +0,0 @@
1
- from abc import abstractmethod
2
- from typing import Callable
3
-
4
- from pika import spec
5
- from pika.adapters.blocking_connection import BlockingChannel
6
-
7
-
8
- class InAdapter:
9
-
10
- @abstractmethod
11
- def start_consuming(self, callback: Callable[[BlockingChannel, spec.Basic.Deliver, spec.BasicProperties, bytes], None]) -> None:
12
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
src/project/adapters/message_queue_in_adapter_impl.py DELETED
@@ -1,19 +0,0 @@
1
- from typing import Callable
2
-
3
- from pika import spec
4
- from pika.adapters.blocking_connection import BlockingChannel
5
-
6
- from project import Config
7
- from project.adapters.in_adapter import InAdapter
8
- from wrapper import MessageQueue
9
-
10
-
11
- class MessageQueueInAdapterImpl(InAdapter):
12
-
13
- def __init__(self, config: Config, queue_name: str) -> None:
14
- self.__config = config
15
- # initialize queue
16
- self.__queue = MessageQueue(config.mq_host, config.mq_user, config.mq_password, queue_name)
17
-
18
- def start_consuming(self, callback: Callable[[BlockingChannel, spec.Basic.Deliver, spec.BasicProperties, bytes], None]) -> None:
19
- self.__queue.start_consuming(callback)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/project/config.py CHANGED
@@ -21,7 +21,7 @@ class Config:
21
  self.mq_password = os.environ.get('MQ_PASSWORD')
22
 
23
  # different modes:
24
- self.queue_mode = os.environ.get('QUEUE_MODE', MODE_CONSUME).lower()
25
  print(self.queue_mode)
26
  # consume: take elements from a queue (alias pull, alias store)
27
  if self.queue_mode in [MODE_CONSUME, 'pull', 'store', ]:
@@ -33,8 +33,8 @@ class Config:
33
  if self.queue_mode in [MODE_CONSUME_AND_PUBLISH, 'pull-push', 'store-n-forward', ]:
34
  self.queue_mode = MODE_CONSUME_AND_PUBLISH
35
 
36
- self.queue_src = os.environ.get('QUEUE_SRC')
37
- self.queue_dest = os.environ.get('QUEUE_DEST')
38
  self.data_directory = os.environ.get('DATA_DIRECTORY', '/storage/data/')
39
  # if mode is consume-and-publish the data_index file will be used for consume and will have a suffix ".published"
40
  self.data_index = os.environ.get('DATA_INDEX', '/storage/index')
@@ -43,9 +43,9 @@ class Config:
43
  self.__assert_variable_is_defined(self.mq_user, "User not specified in MQ_USER")
44
  self.__assert_variable_is_defined(self.mq_password, "Password not specified in MQ_PASSWORD")
45
  if self.is_start_publish():
46
- self.__assert_variable_is_defined(self.queue_dest, "Destination queue is not specified in QUEUE_DEST")
47
- if self.is_start_consume():
48
- self.__assert_variable_is_defined(self.queue_src, "Source queue is not specified in QUEUE_SRC")
49
 
50
  def display(self) -> None:
51
  for name in sorted(dir(self)):
@@ -58,8 +58,6 @@ class Config:
58
  print(f"{name.upper()}={getattr(self, name)}")
59
  print()
60
 
61
- def is_start_consume(self) -> bool:
62
- return self.queue_mode in [MODE_CONSUME, MODE_CONSUME_AND_PUBLISH, ]
63
 
64
  def is_start_publish(self) -> bool:
65
  return self.queue_mode in [MODE_PUBLISH, MODE_CONSUME_AND_PUBLISH, ]
 
21
  self.mq_password = os.environ.get('MQ_PASSWORD')
22
 
23
  # different modes:
24
+ self.queue_mode = os.environ.get('QUEUE_MODE', MODE_PUBLISH).lower()
25
  print(self.queue_mode)
26
  # consume: take elements from a queue (alias pull, alias store)
27
  if self.queue_mode in [MODE_CONSUME, 'pull', 'store', ]:
 
33
  if self.queue_mode in [MODE_CONSUME_AND_PUBLISH, 'pull-push', 'store-n-forward', ]:
34
  self.queue_mode = MODE_CONSUME_AND_PUBLISH
35
 
36
+ self.queue_dest_test_trigger= os.environ.get('QUEUE_DEST_TEST_TRIGGER')
37
+ self.queue_dest_pipeline_trigger = os.environ.get('QUEUE_DEST_PIPELINE_TRIGGER')
38
  self.data_directory = os.environ.get('DATA_DIRECTORY', '/storage/data/')
39
  # if mode is consume-and-publish the data_index file will be used for consume and will have a suffix ".published"
40
  self.data_index = os.environ.get('DATA_INDEX', '/storage/index')
 
43
  self.__assert_variable_is_defined(self.mq_user, "User not specified in MQ_USER")
44
  self.__assert_variable_is_defined(self.mq_password, "Password not specified in MQ_PASSWORD")
45
  if self.is_start_publish():
46
+ self.__assert_variable_is_defined(self.queue_dest_test_trigger, "Destination queue is not specified in QUEUE_DEST_TEST_TRIGGER")
47
+ self.__assert_variable_is_defined(self.queue_dest_pipeline_trigger, "Destination queue is not specified in QUEUE_DEST_PIPELINE_TRIGGER")
48
+
49
 
50
  def display(self) -> None:
51
  for name in sorted(dir(self)):
 
58
  print(f"{name.upper()}={getattr(self, name)}")
59
  print()
60
 
 
 
61
 
62
  def is_start_publish(self) -> bool:
63
  return self.queue_mode in [MODE_PUBLISH, MODE_CONSUME_AND_PUBLISH, ]
src/project/demo_thread.py DELETED
@@ -1,48 +0,0 @@
1
- import ast
2
- from typing import Callable, Union
3
-
4
- from project.adapters import InAdapter, OutAdapter
5
- from wrapper import HealthCheck
6
- from . import Config
7
-
8
-
9
- class DemoThread:
10
- def __init__(
11
- self,
12
- config: Config,
13
- health_check: Union[HealthCheck, None],
14
- in_adapter_injector: Callable[[Config], InAdapter],
15
- out_adapter_injector: Callable[[Config], OutAdapter]
16
- ) -> None:
17
- """
18
- Create BoilerplateRemovalThread instance.
19
- :param config: configuration review_options
20
- :param health_check: health check
21
- :param in_adapter_injector: input adapter, based on config
22
- :param out_adapter_injector: output adapter, based on config
23
- """
24
- self.__config = config
25
- self.__health_check = health_check
26
- # initialize queues
27
- self.__queue_src = in_adapter_injector(config)
28
- self.__queue_dest = out_adapter_injector(config)
29
-
30
- def handle_element(self, ch, method, properties, url: bytes) -> None:
31
- print(f"CONSUME: Received element {properties}: {len(url)}")
32
-
33
- try:
34
-
35
- self.__queue_dest.send_message(bytes(str(url), encoding="utf-8"))
36
- print(f"CONSUME: Send element {properties}")
37
-
38
- except Exception as e:
39
- print("Error", e)
40
-
41
- def run(self) -> None:
42
- # consume elements
43
- # TODO: are these callbacks called only one at a time or in multiple threads?
44
- # self.__handle_element("ch", "method", "properties", bytes("localhost/health", "utf-8"))
45
- # self.__handle_element("ch", "method", "properties", bytes("localhost/health", "utf-8"))
46
- self.__queue_src.start_consuming(self.handle_element)
47
- # print("CONSUME iteration")
48
- # time.sleep(1)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/services/db_service.py ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+
3
+ import streamlit as st
4
+ from dotenv import load_dotenv
5
+ from pymongo import MongoClient
6
+
7
+
8
+ @st.cache_resource
9
+ def init_connection():
10
+ load_dotenv()
11
+ uri = f"mongodb+srv://{os.getenv('MONGO_USERNAME')}:{os.getenv('MONGO_PASSWORD')}@{os.getenv('MONGO_HOST')}/?retryWrites=true&w=majority&appName=Cluster0"
12
+ client = MongoClient(uri)
13
+ return client.event_data
src/services/mq_service.py ADDED
@@ -0,0 +1,17 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import streamlit as st
2
+
3
+ from project import Config
4
+ from project.adapters import MessageQueueOutAdapterImpl, OutAdapter
5
+
6
+
7
+ class PrintingOutAdapter(OutAdapter):
8
+ def send_message(self, message):
9
+ print(f"Publishing message {len(message)}")
10
+
11
+
12
+ @st.cache_resource
13
+ def create_out_adapter(_config: Config, queue_dest: str) -> OutAdapter:
14
+ if _config.mq_user == "local":
15
+ return PrintingOutAdapter()
16
+ else:
17
+ return MessageQueueOutAdapterImpl(_config, queue_dest)
src/wrapper/__init__.py CHANGED
@@ -1,2 +1 @@
1
- from .health_check import HealthCheck
2
  from .message_queue import MessageQueue
 
 
1
  from .message_queue import MessageQueue
src/wrapper/health_check.py DELETED
@@ -1,73 +0,0 @@
1
- import logging
2
- from typing import Callable, Dict
3
-
4
- from flask import Flask
5
- from flask.typing import ResponseValue
6
-
7
-
8
- # adapted from https://stackoverflow.com/questions/62000942/flask-block-specific-endpoints-from-logging
9
- class AjaxFilter(logging.Filter):
10
- def filter(self, record):
11
- return "/health" not in record.getMessage()
12
-
13
-
14
- class HealthCheck:
15
- # adapted from https://thelinuxnotes.com/index.php/implementing-a-flask-health-check-and-kubernetes-liveness-probe-in-python-application/
16
- def __do_check(self) -> tuple[ResponseValue, int]:
17
- # https://docs.python.org/3.9/library/functions.html#all
18
- # https://stackoverflow.com/questions/12229064/mapping-over-values-in-a-python-dictionary
19
- # loop over checks
20
- # execute check method for each key
21
- results = {k: v() for k, v in self.__checks.items()}
22
- overall_result = all(results.values())
23
- json = {
24
- "state": "OK" if overall_result else "ERROR",
25
- "elements": results
26
- }
27
- status_code = 200 if overall_result else 500
28
- return json, status_code
29
-
30
- def __init__(self, app: Flask) -> None:
31
- self.__app = app
32
- self.__checks: Dict[str, Callable[[], bool]] = {}
33
- self.__log = logging.getLogger('werkzeug')
34
- self.disable_logging_filter = AjaxFilter()
35
-
36
- # register route dynamically
37
- # https://pytutorial.com/flask-appadd_url_rule-dynamic-url-registration-guide/
38
- # @app.route('/health')
39
- # @app.route('/health/')
40
- self.__app.add_url_rule(
41
- '/health', # URL rule with variable
42
- 'health_check', # Endpoint name
43
- view_func=self.__do_check, # View function
44
- methods=['GET'], # Allowed methods
45
- strict_slashes=False # URL trailing slash handling
46
- )
47
-
48
- def disable_logging(self) -> None:
49
- print("health check logging disabled")
50
- self.__log.addFilter(self.disable_logging_filter)
51
-
52
- def enable_logging(self) -> None:
53
- print("health check logging enabled")
54
- self.__log.removeFilter(self.disable_logging_filter)
55
-
56
- def add_check(self, name, method: Callable[[], bool]) -> None:
57
- """method to add named check methods to health check
58
- Parameters
59
- ----------
60
- name : str
61
- named key of check method
62
- method : function
63
- method which returns a bool for the health check
64
- """
65
- self.__checks.update({f"{name}": method})
66
-
67
- def wrap_exception_handler(self, name: str, method: Callable[[], None]) -> None:
68
- """wrap exception handler around method and register simple health check for it"""
69
- self.add_check(name, lambda: True)
70
- try:
71
- method()
72
- finally:
73
- self.add_check(name, lambda: False)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/wrapper/message_queue.py CHANGED
@@ -42,6 +42,7 @@ class MessageQueue:
42
  properties=pika.BasicProperties(
43
  delivery_mode=pika.DeliveryMode.Persistent
44
  ))
 
45
 
46
  # try repeating once, if connection was lost
47
  retry = True
@@ -60,21 +61,13 @@ class MessageQueue:
60
  # connection.close()
61
 
62
  def __ack_and_call_callback(
63
- self,
64
- ch: BlockingChannel,
65
- method: spec.Basic.Deliver,
66
- properties: spec.BasicProperties,
67
- body: bytes) -> None:
68
  # acknowledge processing directly
69
  ch.basic_ack(delivery_tag=method.delivery_tag)
70
  # print(f" [x] Received {body.decode()}")
71
  self.__callback(ch, method, properties, body)
72
  # print(" [x] Done")
73
-
74
- def start_consuming(self, callback: Callable[[BlockingChannel, spec.Basic.Deliver, spec.BasicProperties, bytes], None]) -> None:
75
- """callback method parameter is a method with the following signature:
76
- def callback(ch, method, properties, body):"""
77
- self.__callback = callback
78
- self.__channel.basic_qos(prefetch_count=1)
79
- self.__channel.basic_consume(queue=self.__queue_name, on_message_callback=self.__ack_and_call_callback)
80
- self.__channel.start_consuming()
 
42
  properties=pika.BasicProperties(
43
  delivery_mode=pika.DeliveryMode.Persistent
44
  ))
45
+ print(f"[PUBLISH] Queued message in {self.__queue_name}")
46
 
47
  # try repeating once, if connection was lost
48
  retry = True
 
61
  # connection.close()
62
 
63
  def __ack_and_call_callback(
64
+ self,
65
+ ch: BlockingChannel,
66
+ method: spec.Basic.Deliver,
67
+ properties: spec.BasicProperties,
68
+ body: bytes) -> None:
69
  # acknowledge processing directly
70
  ch.basic_ack(delivery_tag=method.delivery_tag)
71
  # print(f" [x] Received {body.decode()}")
72
  self.__callback(ch, method, properties, body)
73
  # print(" [x] Done")