| | import os |
| | import sys |
| |
|
| | MODE_CONSUME = 'consume' |
| | MODE_CONSUME_AND_PUBLISH = 'consume-and-publish' |
| | MODE_PUBLISH = 'publish' |
| |
|
| |
|
| | class Config: |
| | @staticmethod |
| | def __assert_variable_is_defined(value: ..., message: str) -> None: |
| | if value is None: |
| | |
| | print(message) |
| | |
| | sys.exit(1) |
| |
|
| | def __init__(self) -> None: |
| | self.mq_host = os.environ.get('MQ_HOST', 'mq-service') |
| | self.mq_user = os.environ.get('MQ_USER') |
| | self.mq_password = os.environ.get('MQ_PASSWORD') |
| |
|
| | |
| | self.queue_mode = os.environ.get('QUEUE_MODE', MODE_PUBLISH).lower() |
| | print(self.queue_mode) |
| | |
| | if self.queue_mode in [MODE_CONSUME, 'pull', 'store', ]: |
| | self.queue_mode = MODE_CONSUME |
| | |
| | if self.queue_mode in [MODE_PUBLISH, 'push', 'load', ]: |
| | self.queue_mode = MODE_PUBLISH |
| | |
| | if self.queue_mode in [MODE_CONSUME_AND_PUBLISH, 'pull-push', 'store-n-forward', ]: |
| | self.queue_mode = MODE_CONSUME_AND_PUBLISH |
| |
|
| | self.queue_dest_test_trigger= os.environ.get('QUEUE_DEST_TEST_TRIGGER') |
| | self.queue_dest_pipeline_trigger = os.environ.get('QUEUE_DEST_PIPELINE_TRIGGER') |
| | self.data_directory = os.environ.get('DATA_DIRECTORY', '/storage/data/') |
| | |
| | self.data_index = os.environ.get('DATA_INDEX', '/storage/index') |
| |
|
| | |
| | self.__assert_variable_is_defined(self.mq_user, "User not specified in MQ_USER") |
| | self.__assert_variable_is_defined(self.mq_password, "Password not specified in MQ_PASSWORD") |
| | if self.is_start_publish(): |
| | self.__assert_variable_is_defined(self.queue_dest_test_trigger, "Destination queue is not specified in QUEUE_DEST_TEST_TRIGGER") |
| | self.__assert_variable_is_defined(self.queue_dest_pipeline_trigger, "Destination queue is not specified in QUEUE_DEST_PIPELINE_TRIGGER") |
| |
|
| |
|
| | def display(self) -> None: |
| | for name in sorted(dir(self)): |
| | if not name.startswith('_') and name not in [ |
| | 'display', |
| | 'is_start_consume', |
| | 'is_start_publish', |
| | 'mq_password', |
| | ]: |
| | print(f"{name.upper()}={getattr(self, name)}") |
| | print() |
| |
|
| |
|
| | def is_start_publish(self) -> bool: |
| | return self.queue_mode in [MODE_PUBLISH, MODE_CONSUME_AND_PUBLISH, ] |
| |
|