File size: 2,875 Bytes
4b63644
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4e9265f
4b63644
 
 
 
 
 
 
 
 
 
 
4e9265f
 
4b63644
 
 
 
 
 
 
 
4e9265f
 
 
4b63644
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import os
import sys

MODE_CONSUME = 'consume'
MODE_CONSUME_AND_PUBLISH = 'consume-and-publish'
MODE_PUBLISH = 'publish'


class Config:
    @staticmethod
    def __assert_variable_is_defined(value: ..., message: str) -> None:
        if value is None:
            # https://www.geeksforgeeks.org/python-exit-commands-quit-exit-sys-exit-and-os-_exit/
            print(message)
            # https://docs.python.org/3/library/sys.html#sys.exit
            sys.exit(1)

    def __init__(self) -> None:
        self.mq_host = os.environ.get('MQ_HOST', 'mq-service')
        self.mq_user = os.environ.get('MQ_USER')
        self.mq_password = os.environ.get('MQ_PASSWORD')

        # different modes:
        self.queue_mode = os.environ.get('QUEUE_MODE', MODE_PUBLISH).lower()
        print(self.queue_mode)
        # consume: take elements from a queue (alias pull, alias store)
        if self.queue_mode in [MODE_CONSUME, 'pull', 'store', ]:
            self.queue_mode = MODE_CONSUME
        # publish: emit elements to a queue (alias push, alias load)
        if self.queue_mode in [MODE_PUBLISH, 'push', 'load', ]:
            self.queue_mode = MODE_PUBLISH
        # consume-and-publish: take elements from a queue, (store the last COUNT), and publish to another queue
        if self.queue_mode in [MODE_CONSUME_AND_PUBLISH, 'pull-push', 'store-n-forward', ]:
            self.queue_mode = MODE_CONSUME_AND_PUBLISH

        self.queue_dest_test_trigger= os.environ.get('QUEUE_DEST_TEST_TRIGGER')
        self.queue_dest_pipeline_trigger = os.environ.get('QUEUE_DEST_PIPELINE_TRIGGER')
        self.data_directory = os.environ.get('DATA_DIRECTORY', '/storage/data/')
        # if mode is consume-and-publish the data_index file will be used for consume and will have a suffix ".published"
        self.data_index = os.environ.get('DATA_INDEX', '/storage/index')

        # asserts
        self.__assert_variable_is_defined(self.mq_user, "User not specified in MQ_USER")
        self.__assert_variable_is_defined(self.mq_password, "Password not specified in MQ_PASSWORD")
        if self.is_start_publish():
            self.__assert_variable_is_defined(self.queue_dest_test_trigger, "Destination queue is not specified in QUEUE_DEST_TEST_TRIGGER")
            self.__assert_variable_is_defined(self.queue_dest_pipeline_trigger, "Destination queue is not specified in QUEUE_DEST_PIPELINE_TRIGGER")


    def display(self) -> None:
        for name in sorted(dir(self)):
            if not name.startswith('_') and name not in [
                'display',
                'is_start_consume',
                'is_start_publish',
                'mq_password',
            ]:
                print(f"{name.upper()}={getattr(self, name)}")
        print()


    def is_start_publish(self) -> bool:
        return self.queue_mode in [MODE_PUBLISH, MODE_CONSUME_AND_PUBLISH, ]