import os import sys MODE_CONSUME = 'consume' MODE_CONSUME_AND_PUBLISH = 'consume-and-publish' MODE_PUBLISH = 'publish' class Config: @staticmethod def __assert_variable_is_defined(value: ..., message: str) -> None: if value is None: # https://www.geeksforgeeks.org/python-exit-commands-quit-exit-sys-exit-and-os-_exit/ print(message) # https://docs.python.org/3/library/sys.html#sys.exit sys.exit(1) def __init__(self) -> None: self.mq_host = os.environ.get('MQ_HOST', 'mq-service') self.mq_user = os.environ.get('MQ_USER') self.mq_password = os.environ.get('MQ_PASSWORD') # different modes: self.queue_mode = os.environ.get('QUEUE_MODE', MODE_PUBLISH).lower() print(self.queue_mode) # consume: take elements from a queue (alias pull, alias store) if self.queue_mode in [MODE_CONSUME, 'pull', 'store', ]: self.queue_mode = MODE_CONSUME # publish: emit elements to a queue (alias push, alias load) if self.queue_mode in [MODE_PUBLISH, 'push', 'load', ]: self.queue_mode = MODE_PUBLISH # consume-and-publish: take elements from a queue, (store the last COUNT), and publish to another queue if self.queue_mode in [MODE_CONSUME_AND_PUBLISH, 'pull-push', 'store-n-forward', ]: self.queue_mode = MODE_CONSUME_AND_PUBLISH self.queue_dest_test_trigger= os.environ.get('QUEUE_DEST_TEST_TRIGGER') self.queue_dest_pipeline_trigger = os.environ.get('QUEUE_DEST_PIPELINE_TRIGGER') self.data_directory = os.environ.get('DATA_DIRECTORY', '/storage/data/') # if mode is consume-and-publish the data_index file will be used for consume and will have a suffix ".published" self.data_index = os.environ.get('DATA_INDEX', '/storage/index') # asserts self.__assert_variable_is_defined(self.mq_user, "User not specified in MQ_USER") self.__assert_variable_is_defined(self.mq_password, "Password not specified in MQ_PASSWORD") if self.is_start_publish(): self.__assert_variable_is_defined(self.queue_dest_test_trigger, "Destination queue is not specified in QUEUE_DEST_TEST_TRIGGER") self.__assert_variable_is_defined(self.queue_dest_pipeline_trigger, "Destination queue is not specified in QUEUE_DEST_PIPELINE_TRIGGER") def display(self) -> None: for name in sorted(dir(self)): if not name.startswith('_') and name not in [ 'display', 'is_start_consume', 'is_start_publish', 'mq_password', ]: print(f"{name.upper()}={getattr(self, name)}") print() def is_start_publish(self) -> bool: return self.queue_mode in [MODE_PUBLISH, MODE_CONSUME_AND_PUBLISH, ]