demo / src /project /config.py
ElmiraManavi
make service publish-only | create Testing page | refactoring
4e9265f
import os
import sys
MODE_CONSUME = 'consume'
MODE_CONSUME_AND_PUBLISH = 'consume-and-publish'
MODE_PUBLISH = 'publish'
class Config:
@staticmethod
def __assert_variable_is_defined(value: ..., message: str) -> None:
if value is None:
# https://www.geeksforgeeks.org/python-exit-commands-quit-exit-sys-exit-and-os-_exit/
print(message)
# https://docs.python.org/3/library/sys.html#sys.exit
sys.exit(1)
def __init__(self) -> None:
self.mq_host = os.environ.get('MQ_HOST', 'mq-service')
self.mq_user = os.environ.get('MQ_USER')
self.mq_password = os.environ.get('MQ_PASSWORD')
# different modes:
self.queue_mode = os.environ.get('QUEUE_MODE', MODE_PUBLISH).lower()
print(self.queue_mode)
# consume: take elements from a queue (alias pull, alias store)
if self.queue_mode in [MODE_CONSUME, 'pull', 'store', ]:
self.queue_mode = MODE_CONSUME
# publish: emit elements to a queue (alias push, alias load)
if self.queue_mode in [MODE_PUBLISH, 'push', 'load', ]:
self.queue_mode = MODE_PUBLISH
# consume-and-publish: take elements from a queue, (store the last COUNT), and publish to another queue
if self.queue_mode in [MODE_CONSUME_AND_PUBLISH, 'pull-push', 'store-n-forward', ]:
self.queue_mode = MODE_CONSUME_AND_PUBLISH
self.queue_dest_test_trigger= os.environ.get('QUEUE_DEST_TEST_TRIGGER')
self.queue_dest_pipeline_trigger = os.environ.get('QUEUE_DEST_PIPELINE_TRIGGER')
self.data_directory = os.environ.get('DATA_DIRECTORY', '/storage/data/')
# if mode is consume-and-publish the data_index file will be used for consume and will have a suffix ".published"
self.data_index = os.environ.get('DATA_INDEX', '/storage/index')
# asserts
self.__assert_variable_is_defined(self.mq_user, "User not specified in MQ_USER")
self.__assert_variable_is_defined(self.mq_password, "Password not specified in MQ_PASSWORD")
if self.is_start_publish():
self.__assert_variable_is_defined(self.queue_dest_test_trigger, "Destination queue is not specified in QUEUE_DEST_TEST_TRIGGER")
self.__assert_variable_is_defined(self.queue_dest_pipeline_trigger, "Destination queue is not specified in QUEUE_DEST_PIPELINE_TRIGGER")
def display(self) -> None:
for name in sorted(dir(self)):
if not name.startswith('_') and name not in [
'display',
'is_start_consume',
'is_start_publish',
'mq_password',
]:
print(f"{name.upper()}={getattr(self, name)}")
print()
def is_start_publish(self) -> bool:
return self.queue_mode in [MODE_PUBLISH, MODE_CONSUME_AND_PUBLISH, ]