ElmiraManavi commited on
Commit
4b63644
·
1 Parent(s): 75280e5

implement demo service in message queue

Browse files
.env.variables.yml ADDED
@@ -0,0 +1,12 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ variables:
2
+ # image created by this repository
3
+ IMAGE_NAME: ec/bprm
4
+ # image used in .gitlab-ci.yml
5
+ IMAGE_CI: alpine:3.21.3
6
+ #IMAGE_CI: python:3.13.2
7
+ # https://github.com/GoogleContainerTools/kaniko/releases
8
+ IMAGE_KANIKO: gcr.io/kaniko-project/executor:v1.24.0-debug
9
+ # image used by Dockerfile
10
+ # build arguments still have issues in the latest io.fabric8:docker-maven-plugin:0.44,
11
+ # so currently the FROM image is still in Dockerfile
12
+ IMAGE_FROM: python:3.13.2
.gitlab-ci.yml ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ include: .env.variables.yml
2
+
3
+ image: ${IMAGE_CI}
4
+
5
+ .deploy:helper:
6
+ script:
7
+ # anchor definition to determine VERSION_NUMBER
8
+ - &version_number |
9
+ # using yaml anchor to determine VERSION_NUMBER
10
+ export GIT_BRANCH_REPLACED=$(echo $CI_COMMIT_REF_NAME | tr '[:upper:]' '[:lower:]' | sed 's/\//-/g')
11
+ export GIT_COMMIT_DATETIME=$(TZ=UTC git show -s --format=%cd $CI_COMMIT_SHA --date=iso-local)
12
+ export GIT_COMMIT_DATE_ONLY=$(echo $GIT_COMMIT_DATETIME | sed 's/[-:]//g' | cut -f1 -d' ')
13
+ export GIT_COMMIT_TIME_ONLY=$(echo $GIT_COMMIT_DATETIME | sed 's/[-:]//g' | cut -f2 -d' ')
14
+ export GIT_COMMIT_TIME=${GIT_COMMIT_DATE_ONLY}-${GIT_COMMIT_TIME_ONLY}
15
+ export GIT_COMMIT_ID_DESCRIBE_REPLACED=$(git describe --abbrev=8 --always --dirty=-MODIFIED-SNAPSHOT | tr '[:upper:]' '[:lower:]' | sed 's/\//-/g')
16
+ export VERSION_NUMBER=${GIT_BRANCH_REPLACED}-${GIT_COMMIT_TIME}.${GIT_COMMIT_ID_DESCRIBE_REPLACED}
17
+
18
+ # https://docs.gitlab.com/ci/docker/authenticate_registry/
19
+
20
+ stages:
21
+ - prepare
22
+ - build
23
+
24
+ prepare:git-info:
25
+ # build file .env.ci to be used in later steps
26
+ stage: prepare
27
+ script:
28
+ - apk add git
29
+ - *version_number
30
+ - echo "VERSION_NUMBER=${VERSION_NUMBER}" > ./.env.ci
31
+ artifacts:
32
+ paths:
33
+ - .env.ci
34
+ expire_in: 1 day
35
+
36
+ # https://docs.gitlab.com/ci/docker/using_kaniko/
37
+ build:kaniko:
38
+ # kaniko builds and pushes in one step
39
+ needs: ["prepare:git-info"]
40
+ stage: build
41
+ image:
42
+ name: ${IMAGE_KANIKO}
43
+ entrypoint: [""]
44
+ script:
45
+ - ./kaniko-build-and-push.sh
46
+ when: manual
47
+
48
+ # handle-action:
49
+ # stage: action
50
+ # before_script:
51
+ # - echo "$REPO_PASS" | docker login ${IMAGE_HOST} --username $REPO_USER --password-stdin
52
+ # script:
53
+ # - ./cicontainer-install-dependencies.sh
54
+ # # https://docs.ansible.com/ansible/devel/reference_appendices/config.html#cfg-in-world-writable-dir
55
+ # - chmod -R o-w $(pwd)
56
+ # - ./handle-action.sh ${CI_TARGET}
57
+ # when: manual
.variables.parse.sh ADDED
@@ -0,0 +1,52 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/bin/sh
2
+
3
+ # source: https://stackoverflow.com/questions/5014632/how-can-i-parse-a-yaml-file-from-a-linux-shell-script
4
+ parse_ci_variables_yaml()
5
+ {
6
+ filename=${1:-.env.variables.yml}
7
+ prefix=$2
8
+ s='[[:space:]]*' w='[a-zA-Z0-9_]*' fs=$(echo @|tr @ '\034')
9
+ # skipped indents at the start (for our case defaults to 1, since we always have "variables:" present)
10
+ skipped=${3:-1}
11
+ sed -ne "s|^\($s\):|\1|" \
12
+ -e "s|^\($s\)\($w\)$s:$s[\"']\(.*\)[\"']$s\$|\1$fs\2$fs\3|p" \
13
+ -e "s|^\($s\)\($w\)$s:$s\(.*\)$s\$|\1$fs\2$fs\3|p" $filename |
14
+ awk -F$fs -vskipped=$skipped '{
15
+ indent = length($1)/2;
16
+ vname[indent] = $2;
17
+ for (i in vname) {if (i > indent) {delete vname[i]}}
18
+ if (length($3) > 0) {
19
+ vn=""; for (i=skipped; i<indent; i++) {vn=(vn)(vname[i])("_")}
20
+ printf("%s%s%s=\"%s\"\n", "'$prefix'",vn, $2, $3);
21
+ }
22
+ }'
23
+ }
24
+
25
+ eval $(parse_ci_variables_yaml)
26
+
27
+ if [ -e "./.env.ci" ]; then
28
+ # sourcing environment variables from other steps
29
+ . ./.env.ci
30
+ fi
31
+
32
+ if [ -e ".env" ]; then
33
+ . ./.env
34
+ fi
35
+
36
+ CONTAINER_IMAGE=${IMAGE_NAME}
37
+ if [ -n "$IMAGE_HOST" ]; then
38
+ CONTAINER_IMAGE=$IMAGE_HOST/$CONTAINER_IMAGE
39
+ fi
40
+ export CONTAINER_LATEST=latest
41
+ if [ -z "$1" ]; then
42
+ CONTAINER_TAG=
43
+ else
44
+ CONTAINER_TAG=$1
45
+ fi
46
+ if [ -n "$VERSION_NUMBER" ]; then
47
+ # use version number (if set) as tag
48
+ CONTAINER_TAG=${VERSION_NUMBER}
49
+ fi
50
+
51
+ export CONTAINER_IMAGE=${CONTAINER_IMAGE}
52
+ export CONTAINER_TAG=${CONTAINER_TAG}
README.md CHANGED
@@ -18,3 +18,10 @@ Edit `/src/streamlit_app.py` to customize this app to your heart's desire. :hear
18
 
19
  If you have any questions, checkout our [documentation](https://docs.streamlit.io) and [community
20
  forums](https://discuss.streamlit.io).
 
 
 
 
 
 
 
 
18
 
19
  If you have any questions, checkout our [documentation](https://docs.streamlit.io) and [community
20
  forums](https://discuss.streamlit.io).
21
+
22
+ ## Development
23
+ Build Image:
24
+ ```
25
+ docker build -t ec/demo .
26
+
27
+ ```
build.sh ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/bin/sh
2
+ # parse and export .env.variables.yml
3
+ . ./.variables.parse.sh
4
+
5
+ echo "building $CONTAINER_IMAGE:$CONTAINER_LATEST"
6
+ [ -z "$CONTAINER_TAG" ] || echo "and $CONTAINER_IMAGE:$CONTAINER_TAG"
7
+
8
+ docker\
9
+ build\
10
+ --pull\
11
+ --build-arg IMAGE_FROM=${IMAGE_FROM}\
12
+ -t $CONTAINER_IMAGE:$CONTAINER_LATEST\
13
+ `[ -z "$CONTAINER_TAG" ] || echo "-t $CONTAINER_IMAGE:$CONTAINER_TAG"`\
14
+ .
from-container-install-dependencies.sh ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ #!/bin/sh
2
+ #apk add py3-kubernetes\
3
+ # py3-jmespath
4
+ apt update &&
5
+ apt install -y python3-pika curl &&
6
+ rm -rf /var/lib/apt/lists/*
7
+
8
+ cd /workdir
9
+ pip install -r requirements.txt --user
kaniko-build-and-push.sh ADDED
@@ -0,0 +1,105 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/bin/sh
2
+ # parse and export .env.variables.yml
3
+ if [ -n "${CI_PROJECT_DIR}" ]; then
4
+ # CI/CD environment
5
+ cd ${CI_PROJECT_DIR}
6
+ fi
7
+
8
+ . ./.variables.parse.sh
9
+
10
+ echo "building $CONTAINER_IMAGE:$CONTAINER_LATEST"
11
+ [ -z "$CONTAINER_TAG" ] || echo "and $CONTAINER_IMAGE:$CONTAINER_TAG"
12
+
13
+ # build variants
14
+
15
+ can_create_kaniko_config()
16
+ {
17
+ if [ -z "${IMAGE_KANIKO}" -o -z "${IMAGE_HOST}" -o -z "${REPO_USER}" -o -z "${REPO_PASSWORD}" ]; then
18
+ echo "1"
19
+ else
20
+ echo "0"
21
+ fi
22
+ }
23
+
24
+ create_kaniko_config()
25
+ {
26
+ TARGET_FILENAME="${1:-/kaniko/.docker/config.json}"
27
+ echo "creating temporary docker.config for kaniko (${TARGET_FILENAME})"
28
+ if [ -z "${IMAGE_HOST}" -o -z "${REPO_USER}" -o -z "${REPO_PASSWORD}" -o -z "${TARGET_FILENAME}" ]; then
29
+ echo "kaniko docker config cannot be created. are variables set? IMAGE_HOST, REPO_USER, REPO_PASSWORD"
30
+ exit 1
31
+ fi
32
+ # content for /kaniko/.docker/config.json
33
+ echo "{
34
+ \"auths\": {
35
+ \"${IMAGE_HOST}\": {
36
+ \"auth\": \"$(printf "%s:%s" "${REPO_USER}" "${REPO_PASSWORD}" | base64 | tr -d '\n')\"
37
+ }
38
+ }
39
+ }" > "${TARGET_FILENAME}"
40
+ }
41
+
42
+ build_and_push_kaniko()
43
+ {
44
+ create_kaniko_config
45
+ /kaniko/executor \
46
+ --context . \
47
+ --dockerfile Dockerfile \
48
+ --destination "${CONTAINER_IMAGE}:${CONTAINER_TAG}"
49
+ }
50
+
51
+ build_and_push_docker_kaniko()
52
+ {
53
+ TMP_KANIKO_CONFIG="kaniko.docker.config.json.tmp"
54
+ create_kaniko_config "${TMP_KANIKO_CONFIG}"
55
+
56
+ docker \
57
+ run --rm -i \
58
+ -mount type=bind,source="$(pwd)/${TMP_KANIKO_CONFIG}",target=/kaniko/.docker/config.json \
59
+ -mount type=bind,source="$(pwd)",target=/workspace \
60
+ -e "IMAGE_HOST=${IMAGE_HOST}" \
61
+ -e "REPO_USER=${REPO_USER}" \
62
+ -e "REPO_PASSWORD=${REPO_PASSWORD}" \
63
+ ${IMAGE_KANIKO} \
64
+ /kaniko/executor \
65
+ --context . \
66
+ --dockerfile Dockerfile \
67
+ --destination "${CONTAINER_IMAGE}:${CONTAINER_TAG}"
68
+
69
+ rm "${TMP_KANIKO_CONFIG}"
70
+ }
71
+
72
+ build_docker_only()
73
+ {
74
+ docker \
75
+ build \
76
+ --build-arg "IMAGE_FROM=${IMAGE_FROM}" \
77
+ -t $CONTAINER_IMAGE:$CONTAINER_LATEST \
78
+ $([ -z "$CONTAINER_TAG" ] && echo "" || echo "-t $CONTAINER_IMAGE:$CONTAINER_TAG") \
79
+ .
80
+ }
81
+
82
+ push_docker_only()
83
+ {
84
+ echo docker \
85
+ push \
86
+ -t $CONTAINER_IMAGE:$CONTAINER_LATEST \
87
+ $([ -z "$CONTAINER_TAG" ] && echo "" || echo "-t $CONTAINER_IMAGE:$CONTAINER_TAG")
88
+ }
89
+
90
+ HAS_REQUIRED_PARAMETERS=$(can_create_kaniko_config)
91
+ # kaniko not present, using docker to start kaniko build
92
+ if [ "${HAS_REQUIRED_PARAMETERS}" = "0" ]; then
93
+ if [ -e "/kaniko/executor" ]; then
94
+ # kaniko present, building with kaniko
95
+ echo building with kaniko
96
+ build_and_push_kaniko
97
+ else
98
+ echo building with kaniko using docker
99
+ build_and_push_docker_kaniko
100
+ fi
101
+ else
102
+ echo building with docker
103
+ build_docker_only
104
+ push_docker_only
105
+ fi
local-docker-build.sh ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/bin/sh
2
+ # parse and export .env.variables.yml
3
+ . ./.variables.parse.sh
4
+
5
+ echo "building $CONTAINER_IMAGE:$CONTAINER_LATEST"
6
+ [ -z "$CONTAINER_TAG" ] || echo "and $CONTAINER_IMAGE:$CONTAINER_TAG"
7
+
8
+ docker\
9
+ build\
10
+ --build-arg IMAGE_FROM=${IMAGE_FROM}\
11
+ -t $CONTAINER_IMAGE:$CONTAINER_LATEST\
12
+ $([ -z "$CONTAINER_TAG" ] && echo "" || echo "-t $CONTAINER_IMAGE:$CONTAINER_TAG")\
13
+ .
push.sh ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/bin/sh
2
+ # parse and export .env.variables.yml
3
+ . ./.variables.parse.sh
4
+ echo "pushing $CONTAINER_IMAGE:$CONTAINER_LATEST" &&\
5
+ docker\
6
+ push\
7
+ $CONTAINER_IMAGE:$CONTAINER_LATEST
8
+
9
+ [ -z "$CONTAINER_TAG" ] ||\
10
+ echo "pushing $CONTAINER_IMAGE:$CONTAINER_TAG" &&\
11
+ docker\
12
+ push\
13
+ $CONTAINER_IMAGE:$CONTAINER_TAG
requirements.txt CHANGED
Binary files a/requirements.txt and b/requirements.txt differ
 
run-ci-container.sh ADDED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/bin/sh
2
+ # parse and export .env.variables.yml
3
+ . ./.variables.parse.sh
4
+
5
+ export BUILDKIT_PROGRESS=plain
6
+
7
+ # special case for windows
8
+ DOCKER_PREFIX=$(which winpty)
9
+ if [ -n "$DOCKER_PREFIX" ]; then
10
+ DOCKER_PREFIX="$DOCKER_PREFIX "
11
+ fi
12
+
13
+ # https://superuser.com/questions/344478/bash-execute-command-given-in-commandline-and-dont-exit
14
+ ${DOCKER_PREFIX}docker run \
15
+ --rm\
16
+ -ti\
17
+ --mount "type=bind,src=/var/run/docker.sock,target=/var/run/docker.sock"\
18
+ --mount "type=bind,src=$(pwd),target=/workdir"\
19
+ "$IMAGE_CI"\
20
+ sh -c "cd /workdir ; exec sh"
run-from-container.sh ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/bin/sh
2
+ # parse and export .env.variables.yml
3
+ . ./.variables.parse.sh
4
+
5
+ export BUILDKIT_PROGRESS=plain
6
+
7
+ # special case for windows
8
+ DOCKER_PREFIX=$(which winpty)
9
+ if [ -n "$DOCKER_PREFIX" ]; then
10
+ DOCKER_PREFIX="$DOCKER_PREFIX "
11
+ fi
12
+
13
+ # https://superuser.com/questions/344478/bash-execute-command-given-in-commandline-and-dont-exit
14
+ ${DOCKER_PREFIX}docker run \
15
+ --rm\
16
+ -ti\
17
+ --mount "type=bind,src=$(pwd),target=/workdir"\
18
+ "$IMAGE_FROM"\
19
+ bash -c "echo Entering FROM container && /workdir/from-container-install-dependencies.sh && cd /workdir; exec bash"
src/__init__.py ADDED
File without changes
src/pages/Pipeline_Demo.py ADDED
@@ -0,0 +1,77 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import logging
3
+ import threading
4
+ from typing import Callable
5
+
6
+ import streamlit as st
7
+ from dotenv import load_dotenv
8
+
9
+ from project import Config, DemoThread
10
+ from project.adapters import InAdapter, OutAdapter, MessageQueueOutAdapterImpl
11
+ from wrapper import HealthCheck
12
+
13
+
14
+ logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
15
+ logger = logging.getLogger(__name__)
16
+
17
+ load_dotenv()
18
+
19
+
20
+ @st.cache_resource
21
+ def create_project_thread(_config: Config) -> DemoThread:
22
+ in_adapter_injector: Callable[[Config], InAdapter] = lambda c: InAdapter()
23
+
24
+ if config.mq_user == "local":
25
+ logger.info("Running with local OutAdapter (no MQ connection).")
26
+ out_adapter_injector: Callable[[Config], OutAdapter] = lambda c: OutAdapter()
27
+ else:
28
+ logger.info(f"Using real MessageQueueOutAdapter -> {config.queue_dest}")
29
+ out_adapter_injector: Callable[[Config], OutAdapter] = lambda c: MessageQueueOutAdapterImpl(c, c.queue_dest)
30
+
31
+ return DemoThread(config, None, in_adapter_injector, out_adapter_injector)
32
+
33
+
34
+ config = Config()
35
+ config.display()
36
+
37
+
38
+ publish_thread = create_project_thread(config)
39
+
40
+ if config.is_start_publish():
41
+ logger.info("Starting MQ consume thread...")
42
+ consume_thread = threading.Thread(target=publish_thread.run, daemon=True)
43
+ consume_thread.start()
44
+
45
+
46
+
47
+ st.title("Pipeline Demo")
48
+ st.markdown("Sende eine Url an die Message Queue um die Pipeline zu starten:")
49
+
50
+ user_input = st.text_input("URL eingeben:")
51
+
52
+ if st.button("Senden"):
53
+ if user_input:
54
+ try:
55
+ data = {"url": user_input}
56
+ message_bytes = bytes(str(data), "utf-8")
57
+
58
+ # Direkter Aufruf des OutAdapters zum Senden
59
+ publish_thread._DemoThread__queue_dest.send_message(message_bytes)
60
+
61
+ logger.info(f"Message sent to MQ: {data}")
62
+ except Exception as e:
63
+ logger.exception("Fehler beim MQ-Senden")
64
+ else:
65
+ st.warning("Bitte eine Nachricht eingeben, bevor du sendest.")
66
+
67
+
68
+ # ------------------------------------------------------------
69
+ # 🔍 Optional: Debug-Bereich
70
+ # ------------------------------------------------------------
71
+ with st.expander("Debug-Informationen"):
72
+ st.json({
73
+ "mq_host": config.mq_host,
74
+ "mq_user": config.mq_user,
75
+ "mq_queue_dest": config.queue_dest,
76
+ "is_start_publish": config.is_start_publish()
77
+ })
src/project/__init__.py ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ from .config import Config
2
+ from .demo_thread import DemoThread
src/project/adapters/__init__.py ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ from .in_adapter import InAdapter
2
+ from .message_queue_in_adapter_impl import MessageQueueInAdapterImpl
3
+ from .message_queue_out_adapter_impl import MessageQueueOutAdapterImpl
4
+ from .out_adapter import OutAdapter
src/project/adapters/in_adapter.py ADDED
@@ -0,0 +1,12 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from abc import abstractmethod
2
+ from typing import Callable
3
+
4
+ from pika import spec
5
+ from pika.adapters.blocking_connection import BlockingChannel
6
+
7
+
8
+ class InAdapter:
9
+
10
+ @abstractmethod
11
+ def start_consuming(self, callback: Callable[[BlockingChannel, spec.Basic.Deliver, spec.BasicProperties, bytes], None]) -> None:
12
+ pass
src/project/adapters/message_queue_in_adapter_impl.py ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Callable
2
+
3
+ from pika import spec
4
+ from pika.adapters.blocking_connection import BlockingChannel
5
+
6
+ from project import Config
7
+ from project.adapters.in_adapter import InAdapter
8
+ from wrapper import MessageQueue
9
+
10
+
11
+ class MessageQueueInAdapterImpl(InAdapter):
12
+
13
+ def __init__(self, config: Config, queue_name: str) -> None:
14
+ self.__config = config
15
+ # initialize queue
16
+ self.__queue = MessageQueue(config.mq_host, config.mq_user, config.mq_password, queue_name)
17
+
18
+ def start_consuming(self, callback: Callable[[BlockingChannel, spec.Basic.Deliver, spec.BasicProperties, bytes], None]) -> None:
19
+ self.__queue.start_consuming(callback)
src/project/adapters/message_queue_out_adapter_impl.py ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from project import Config
2
+ from project.adapters.out_adapter import OutAdapter
3
+ from wrapper import MessageQueue
4
+
5
+
6
+ class MessageQueueOutAdapterImpl(OutAdapter):
7
+
8
+ def __init__(self, config: Config, queue_name: str) -> None:
9
+ self.__config = config
10
+ # initialize queue
11
+ self.__queue = MessageQueue(config.mq_host, config.mq_user, config.mq_password, queue_name)
12
+
13
+ def send_message(self, message: bytes) -> None:
14
+ self.__queue.send_message(message)
src/project/adapters/out_adapter.py ADDED
@@ -0,0 +1,8 @@
 
 
 
 
 
 
 
 
 
1
+ from abc import abstractmethod
2
+
3
+
4
+ class OutAdapter:
5
+
6
+ @abstractmethod
7
+ def send_message(self, message: bytes) -> None:
8
+ pass
src/project/config.py ADDED
@@ -0,0 +1,65 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import sys
3
+
4
+ MODE_CONSUME = 'consume'
5
+ MODE_CONSUME_AND_PUBLISH = 'consume-and-publish'
6
+ MODE_PUBLISH = 'publish'
7
+
8
+
9
+ class Config:
10
+ @staticmethod
11
+ def __assert_variable_is_defined(value: ..., message: str) -> None:
12
+ if value is None:
13
+ # https://www.geeksforgeeks.org/python-exit-commands-quit-exit-sys-exit-and-os-_exit/
14
+ print(message)
15
+ # https://docs.python.org/3/library/sys.html#sys.exit
16
+ sys.exit(1)
17
+
18
+ def __init__(self) -> None:
19
+ self.mq_host = os.environ.get('MQ_HOST', 'mq-service')
20
+ self.mq_user = os.environ.get('MQ_USER')
21
+ self.mq_password = os.environ.get('MQ_PASSWORD')
22
+
23
+ # different modes:
24
+ self.queue_mode = os.environ.get('QUEUE_MODE', MODE_CONSUME).lower()
25
+ print(self.queue_mode)
26
+ # consume: take elements from a queue (alias pull, alias store)
27
+ if self.queue_mode in [MODE_CONSUME, 'pull', 'store', ]:
28
+ self.queue_mode = MODE_CONSUME
29
+ # publish: emit elements to a queue (alias push, alias load)
30
+ if self.queue_mode in [MODE_PUBLISH, 'push', 'load', ]:
31
+ self.queue_mode = MODE_PUBLISH
32
+ # consume-and-publish: take elements from a queue, (store the last COUNT), and publish to another queue
33
+ if self.queue_mode in [MODE_CONSUME_AND_PUBLISH, 'pull-push', 'store-n-forward', ]:
34
+ self.queue_mode = MODE_CONSUME_AND_PUBLISH
35
+
36
+ self.queue_src = os.environ.get('QUEUE_SRC')
37
+ self.queue_dest = os.environ.get('QUEUE_DEST')
38
+ self.data_directory = os.environ.get('DATA_DIRECTORY', '/storage/data/')
39
+ # if mode is consume-and-publish the data_index file will be used for consume and will have a suffix ".published"
40
+ self.data_index = os.environ.get('DATA_INDEX', '/storage/index')
41
+
42
+ # asserts
43
+ self.__assert_variable_is_defined(self.mq_user, "User not specified in MQ_USER")
44
+ self.__assert_variable_is_defined(self.mq_password, "Password not specified in MQ_PASSWORD")
45
+ if self.is_start_publish():
46
+ self.__assert_variable_is_defined(self.queue_dest, "Destination queue is not specified in QUEUE_DEST")
47
+ if self.is_start_consume():
48
+ self.__assert_variable_is_defined(self.queue_src, "Source queue is not specified in QUEUE_SRC")
49
+
50
+ def display(self) -> None:
51
+ for name in sorted(dir(self)):
52
+ if not name.startswith('_') and name not in [
53
+ 'display',
54
+ 'is_start_consume',
55
+ 'is_start_publish',
56
+ 'mq_password',
57
+ ]:
58
+ print(f"{name.upper()}={getattr(self, name)}")
59
+ print()
60
+
61
+ def is_start_consume(self) -> bool:
62
+ return self.queue_mode in [MODE_CONSUME, MODE_CONSUME_AND_PUBLISH, ]
63
+
64
+ def is_start_publish(self) -> bool:
65
+ return self.queue_mode in [MODE_PUBLISH, MODE_CONSUME_AND_PUBLISH, ]
src/project/demo_thread.py ADDED
@@ -0,0 +1,49 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import ast
2
+ from typing import Callable, Union
3
+
4
+ from project.adapters import InAdapter, OutAdapter
5
+ from wrapper import HealthCheck
6
+ from . import Config
7
+
8
+
9
+ class DemoThread:
10
+ def __init__(
11
+ self,
12
+ config: Config,
13
+ health_check: Union[HealthCheck, None],
14
+ in_adapter_injector: Callable[[Config], InAdapter],
15
+ out_adapter_injector: Callable[[Config], OutAdapter]
16
+ ) -> None:
17
+ """
18
+ Create BoilerplateRemovalThread instance.
19
+ :param config: configuration options
20
+ :param health_check: health check
21
+ :param in_adapter_injector: input adapter, based on config
22
+ :param out_adapter_injector: output adapter, based on config
23
+ """
24
+ self.__config = config
25
+ self.__health_check = health_check
26
+ # initialize queues
27
+ self.__queue_src = in_adapter_injector(config)
28
+ self.__queue_dest = out_adapter_injector(config)
29
+
30
+ def handle_element(self, ch, method, properties, body: bytes) -> None:
31
+ print(f"CONSUME: Received element {properties}: {len(body)}")
32
+
33
+ try:
34
+ url = ast.literal_eval(body.decode('utf-8'))['url']
35
+ data = {"url": url}
36
+ self.__queue_dest.send_message(bytes(str(data), encoding="utf-8"))
37
+ print(f"CONSUME: Send element {properties}")
38
+
39
+ except Exception as e:
40
+ print("Error", e)
41
+
42
+ def run(self) -> None:
43
+ # consume elements
44
+ # TODO: are these callbacks called only one at a time or in multiple threads?
45
+ # self.__handle_element("ch", "method", "properties", bytes("localhost/health", "utf-8"))
46
+ # self.__handle_element("ch", "method", "properties", bytes("localhost/health", "utf-8"))
47
+ self.__queue_src.start_consuming(self.handle_element)
48
+ # print("CONSUME iteration")
49
+ # time.sleep(1)
src/streamlit_app.py CHANGED
@@ -1 +1,3 @@
1
  import streamlit as st
 
 
 
1
  import streamlit as st
2
+
3
+
src/wrapper/__init__.py ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ from .health_check import HealthCheck
2
+ from .message_queue import MessageQueue
src/wrapper/health_check.py ADDED
@@ -0,0 +1,73 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ from typing import Callable, Dict
3
+
4
+ from flask import Flask
5
+ from flask.typing import ResponseValue
6
+
7
+
8
+ # adapted from https://stackoverflow.com/questions/62000942/flask-block-specific-endpoints-from-logging
9
+ class AjaxFilter(logging.Filter):
10
+ def filter(self, record):
11
+ return "/health" not in record.getMessage()
12
+
13
+
14
+ class HealthCheck:
15
+ # adapted from https://thelinuxnotes.com/index.php/implementing-a-flask-health-check-and-kubernetes-liveness-probe-in-python-application/
16
+ def __do_check(self) -> tuple[ResponseValue, int]:
17
+ # https://docs.python.org/3.9/library/functions.html#all
18
+ # https://stackoverflow.com/questions/12229064/mapping-over-values-in-a-python-dictionary
19
+ # loop over checks
20
+ # execute check method for each key
21
+ results = {k: v() for k, v in self.__checks.items()}
22
+ overall_result = all(results.values())
23
+ json = {
24
+ "state": "OK" if overall_result else "ERROR",
25
+ "elements": results
26
+ }
27
+ status_code = 200 if overall_result else 500
28
+ return json, status_code
29
+
30
+ def __init__(self, app: Flask) -> None:
31
+ self.__app = app
32
+ self.__checks: Dict[str, Callable[[], bool]] = {}
33
+ self.__log = logging.getLogger('werkzeug')
34
+ self.disable_logging_filter = AjaxFilter()
35
+
36
+ # register route dynamically
37
+ # https://pytutorial.com/flask-appadd_url_rule-dynamic-url-registration-guide/
38
+ # @app.route('/health')
39
+ # @app.route('/health/')
40
+ self.__app.add_url_rule(
41
+ '/health', # URL rule with variable
42
+ 'health_check', # Endpoint name
43
+ view_func=self.__do_check, # View function
44
+ methods=['GET'], # Allowed methods
45
+ strict_slashes=False # URL trailing slash handling
46
+ )
47
+
48
+ def disable_logging(self) -> None:
49
+ print("health check logging disabled")
50
+ self.__log.addFilter(self.disable_logging_filter)
51
+
52
+ def enable_logging(self) -> None:
53
+ print("health check logging enabled")
54
+ self.__log.removeFilter(self.disable_logging_filter)
55
+
56
+ def add_check(self, name, method: Callable[[], bool]) -> None:
57
+ """method to add named check methods to health check
58
+ Parameters
59
+ ----------
60
+ name : str
61
+ named key of check method
62
+ method : function
63
+ method which returns a bool for the health check
64
+ """
65
+ self.__checks.update({f"{name}": method})
66
+
67
+ def wrap_exception_handler(self, name: str, method: Callable[[], None]) -> None:
68
+ """wrap exception handler around method and register simple health check for it"""
69
+ self.add_check(name, lambda: True)
70
+ try:
71
+ method()
72
+ finally:
73
+ self.add_check(name, lambda: False)
src/wrapper/message_queue.py ADDED
@@ -0,0 +1,80 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Callable
2
+
3
+ import pika
4
+ from pika import BlockingConnection, spec
5
+ from pika.adapters.blocking_connection import BlockingChannel
6
+ from pika.exceptions import StreamLostError
7
+
8
+
9
+ class MessageQueue:
10
+ __channel: BlockingChannel
11
+ __connection: BlockingConnection
12
+ __callback: Callable[[BlockingChannel, spec.Basic.Deliver, spec.BasicProperties, bytes], None]
13
+
14
+ def __init__(self, host: str, user: str, password: str, queue_name: str) -> None:
15
+ self.__host = host
16
+ self.__user = user
17
+ self.__password = password
18
+ self.__queue_name = queue_name
19
+ self.connect()
20
+ # create queue if necessary
21
+ self.__channel.queue_declare(queue=self.__queue_name, durable=True)
22
+
23
+ def close(self) -> None:
24
+ self.__connection.close()
25
+
26
+ def connect(self) -> None:
27
+ # print(f"Connecting to {self.__queue_name}")
28
+ self.__connection = pika.BlockingConnection(
29
+ pika.ConnectionParameters(
30
+ credentials=pika.PlainCredentials(self.__user, self.__password),
31
+ host=self.__host
32
+ ))
33
+ self.__channel = self.__connection.channel()
34
+
35
+ def send_message(self, message: bytes) -> None:
36
+ # message = ' '.join(sys.argv[1:]) or "Hello World!"
37
+ def __send():
38
+ self.__channel.basic_publish(
39
+ exchange='',
40
+ routing_key=self.__queue_name,
41
+ body=message,
42
+ properties=pika.BasicProperties(
43
+ delivery_mode=pika.DeliveryMode.Persistent
44
+ ))
45
+
46
+ # try repeating once, if connection was lost
47
+ retry = True
48
+ while retry:
49
+ try:
50
+ __send()
51
+ retry = False
52
+ except StreamLostError as e:
53
+ print("Reconnecting...")
54
+ if retry:
55
+ self.connect()
56
+ retry = False
57
+ else:
58
+ raise e
59
+ # print(f" [x] Sent {message}")
60
+ # connection.close()
61
+
62
+ def __ack_and_call_callback(
63
+ self,
64
+ ch: BlockingChannel,
65
+ method: spec.Basic.Deliver,
66
+ properties: spec.BasicProperties,
67
+ body: bytes) -> None:
68
+ # acknowledge processing directly
69
+ ch.basic_ack(delivery_tag=method.delivery_tag)
70
+ # print(f" [x] Received {body.decode()}")
71
+ self.__callback(ch, method, properties, body)
72
+ # print(" [x] Done")
73
+
74
+ def start_consuming(self, callback: Callable[[BlockingChannel, spec.Basic.Deliver, spec.BasicProperties, bytes], None]) -> None:
75
+ """callback method parameter is a method with the following signature:
76
+ def callback(ch, method, properties, body):"""
77
+ self.__callback = callback
78
+ self.__channel.basic_qos(prefetch_count=1)
79
+ self.__channel.basic_consume(queue=self.__queue_name, on_message_callback=self.__ack_and_call_callback)
80
+ self.__channel.start_consuming()