Elmira Manavi commited on
Commit
cbe1cd5
·
2 Parent(s): 8cf2949 2e25d51

Merge branch 'SCRUM-37-frontend' into 'main'

Browse files
requirements.txt CHANGED
Binary files a/requirements.txt and b/requirements.txt differ
 
src/pages/Eventportal.py CHANGED
@@ -1,12 +1,10 @@
1
- import os
2
  from datetime import datetime
 
3
 
4
  from bson import ObjectId
5
  from streamlit import streamlit as st
6
 
7
- from dotenv import load_dotenv
8
- from pymongo import MongoClient
9
- from itertools import batched
10
 
11
  st.set_page_config(layout="wide")
12
 
@@ -26,23 +24,16 @@ st.markdown(
26
  review_options = [":green[:material/done:]", ":red[:material/highlight_off:]"]
27
  review_filter_options = ["Abgeschlossene Reviews", "Offene Reviews"]
28
  review_fields = [
29
- ("Titel", "title_valid"),
30
- ("Kategorie", "category_valid"),
31
- ("Subkategorie", "format_valid"),
32
- ("Startdatum", "startdate_valid"),
33
- ("Startzeit", "starttime_valid"),
34
- ("Enddatum", "enddate_valid"),
35
- ("Endzeit", "endtime_valid"),
36
- ("Adresse", "address_valid"),
37
- ("Link", "url_valid")
38
- ]
39
-
40
- @st.cache_resource
41
- def init_connection():
42
- load_dotenv()
43
- uri = f"mongodb+srv://{os.getenv('MONGO_USERNAME')}:{os.getenv('MONGO_PASSWORD')}@{os.getenv('MONGO_HOST')}/?retryWrites=true&w=majority&appName=Cluster0"
44
- client = MongoClient(uri)
45
- return client.event_data
46
 
47
 
48
  @st.cache_resource
@@ -183,9 +174,9 @@ def display_event(event_data):
183
 
184
  if comments:
185
  st.write("**Kommentare**")
186
- for comment_id,comment in enumerate(comments):
187
  st.write(f"💬 {comment}")
188
- st.chat_input("Neuer Kommentar" , key="comment_input" + str(event_data["_id"]),
189
  on_submit=update_review_comment,
190
  args=(str(event_data["_id"]),))
191
 
@@ -193,8 +184,6 @@ def display_event(event_data):
193
  def render_review_controls(event_data):
194
  """Erzeugt dynamisch alle Review-Segmented-Controls für ein Event."""
195
 
196
-
197
-
198
  # Dynamisch Spalten erzeugen
199
  cols = st.columns(len(review_fields))
200
 
@@ -335,13 +324,13 @@ batches = st.session_state.batched_events
335
 
336
  # Page Content
337
 
338
- col_rev_toggle, col_control1 = st.columns([1,4])
339
  with col_rev_toggle:
340
  st.toggle("Review Mode", value=False, on_change=toggle_review_mode)
341
  if st.session_state.review_mode:
342
  with col_control1:
343
- st.pills("Filter Reviews", review_filter_options, key="filter_reviews_input", label_visibility="collapsed", on_change=update_filter)
344
-
345
 
346
  st.title("Eventportal")
347
 
 
 
1
  from datetime import datetime
2
+ from itertools import batched
3
 
4
  from bson import ObjectId
5
  from streamlit import streamlit as st
6
 
7
+ from services.db_service import init_connection
 
 
8
 
9
  st.set_page_config(layout="wide")
10
 
 
24
  review_options = [":green[:material/done:]", ":red[:material/highlight_off:]"]
25
  review_filter_options = ["Abgeschlossene Reviews", "Offene Reviews"]
26
  review_fields = [
27
+ ("Titel", "title_valid"),
28
+ ("Kategorie", "category_valid"),
29
+ ("Subkategorie", "format_valid"),
30
+ ("Startdatum", "startdate_valid"),
31
+ ("Startzeit", "starttime_valid"),
32
+ ("Enddatum", "enddate_valid"),
33
+ ("Endzeit", "endtime_valid"),
34
+ ("Adresse", "address_valid"),
35
+ ("Link", "url_valid")
36
+ ]
 
 
 
 
 
 
 
37
 
38
 
39
  @st.cache_resource
 
174
 
175
  if comments:
176
  st.write("**Kommentare**")
177
+ for comment_id, comment in enumerate(comments):
178
  st.write(f"💬 {comment}")
179
+ st.chat_input("Neuer Kommentar", key="comment_input" + str(event_data["_id"]),
180
  on_submit=update_review_comment,
181
  args=(str(event_data["_id"]),))
182
 
 
184
  def render_review_controls(event_data):
185
  """Erzeugt dynamisch alle Review-Segmented-Controls für ein Event."""
186
 
 
 
187
  # Dynamisch Spalten erzeugen
188
  cols = st.columns(len(review_fields))
189
 
 
324
 
325
  # Page Content
326
 
327
+ col_rev_toggle, col_control1 = st.columns([1, 4])
328
  with col_rev_toggle:
329
  st.toggle("Review Mode", value=False, on_change=toggle_review_mode)
330
  if st.session_state.review_mode:
331
  with col_control1:
332
+ st.pills("Filter Reviews", review_filter_options, key="filter_reviews_input", label_visibility="collapsed",
333
+ on_change=update_filter)
334
 
335
  st.title("Eventportal")
336
 
src/pages/Pipeline_Demo.py CHANGED
@@ -1,43 +1,15 @@
1
- import os
2
- import logging
3
- import threading
4
- from typing import Callable
5
-
6
  import streamlit as st
7
  from dotenv import load_dotenv
8
 
9
- from project import Config, DemoThread
10
- from project.adapters import InAdapter, OutAdapter, MessageQueueOutAdapterImpl
11
-
12
- logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
13
- logger = logging.getLogger(__name__)
14
 
15
  load_dotenv()
16
 
17
-
18
- @st.cache_resource
19
- def create_project_thread(_config: Config) -> DemoThread:
20
- in_adapter_injector: Callable[[Config], InAdapter] = lambda c: InAdapter()
21
-
22
- if config.mq_user == "local":
23
- logger.info("Running with local OutAdapter (no MQ connection).")
24
- out_adapter_injector: Callable[[Config], OutAdapter] = lambda c: OutAdapter()
25
- else:
26
- logger.info(f"Using real MessageQueueOutAdapter -> {config.queue_dest}")
27
- out_adapter_injector: Callable[[Config], OutAdapter] = lambda c: MessageQueueOutAdapterImpl(c, c.queue_dest)
28
-
29
- return DemoThread(config, None, in_adapter_injector, out_adapter_injector)
30
-
31
-
32
  config = Config()
33
  config.display()
34
 
35
- publish_thread = create_project_thread(config)
36
-
37
- if config.is_start_publish():
38
- logger.info("Starting MQ consume thread...")
39
- consume_thread = threading.Thread(target=publish_thread.run, daemon=True)
40
- consume_thread.start()
41
 
42
  st.title("Pipeline Demo")
43
  st.markdown("Sende eine Url an die Message Queue um die Pipeline zu starten:")
@@ -47,11 +19,9 @@ user_input = st.text_input("URL eingeben:")
47
  if st.button("Senden"):
48
  if user_input:
49
  try:
50
-
51
- publish_thread._DemoThread__queue_dest.send_message(user_input.strip())
52
  st.info("URL wurde gesendet")
53
- logger.info(f"Message sent to MQ: {user_input}")
54
  except Exception as e:
55
- logger.exception("Fehler beim MQ-Senden")
56
  else:
57
  st.warning("Bitte eine Nachricht eingeben, bevor du sendest.")
 
 
 
 
 
 
1
  import streamlit as st
2
  from dotenv import load_dotenv
3
 
4
+ from project import Config
5
+ from services.mq_service import create_out_adapter
 
 
 
6
 
7
  load_dotenv()
8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
  config = Config()
10
  config.display()
11
 
12
+ out_adapter = create_out_adapter(config, config.queue_dest_pipeline_trigger)
 
 
 
 
 
13
 
14
  st.title("Pipeline Demo")
15
  st.markdown("Sende eine Url an die Message Queue um die Pipeline zu starten:")
 
19
  if st.button("Senden"):
20
  if user_input:
21
  try:
22
+ out_adapter.send_message(user_input.strip())
 
23
  st.info("URL wurde gesendet")
 
24
  except Exception as e:
25
+ st.error(f"Fehler beim senden in die Message Queue: {e}")
26
  else:
27
  st.warning("Bitte eine Nachricht eingeben, bevor du sendest.")
src/pages/Test_Evaluation.py ADDED
@@ -0,0 +1,256 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import matplotlib.pyplot as plt
2
+ import pandas as pd
3
+ import seaborn as sns
4
+ import streamlit as st
5
+ from bson import ObjectId
6
+ from html_to_markdown import convert
7
+
8
+ from services import init_connection
9
+
10
+ st.set_page_config(layout="wide")
11
+
12
+ st.markdown(
13
+ """
14
+ <style>
15
+ .block-container {
16
+ width: 80vw;
17
+ max-width: 1400px;
18
+ margin: 0 auto;
19
+ }
20
+ </style>
21
+ """,
22
+ unsafe_allow_html=True,
23
+ )
24
+
25
+
26
+ def print_schedule_obj(s):
27
+ start_date = s.get("start_date")
28
+ if not start_date:
29
+ start_date = s.get("startdate")
30
+ start_date_str = start_date.strftime("%d.%m.%Y") if start_date else ""
31
+
32
+ end_date = s.get("end_date")
33
+ if not end_date:
34
+ end_date = s.get("enddate")
35
+ end_date_str = end_date.strftime("%d.%m.%Y") if end_date else ""
36
+
37
+ start_time = s.get("start_time")
38
+ if not start_time:
39
+ start_time = s.get("starttime")
40
+ start_time_str = start_time.strftime("%H:%M") if start_time else ""
41
+
42
+ end_time = s.get("end_time")
43
+ if not end_time:
44
+ end_time = s.get("endtime")
45
+ end_time_str = end_time.strftime("%H:%M") if end_time else ""
46
+
47
+ return f"{start_date_str} - {end_date_str} | {start_time_str} - {end_time_str}\n\n"
48
+
49
+
50
+ def create_data_metrics_df(overall_metrics: dict) -> pd.DataFrame:
51
+ field_order = ["page_type", "title", "locations", "schedule", "start_date", "end_date", "start_time", "end_time"]
52
+
53
+ rows = {key: "" for key in field_order if key in overall_metrics}
54
+
55
+ for field, metrics in overall_metrics.items():
56
+ cleaned_metrics = metrics.copy()
57
+ cleaned_metrics = {k: v for k, v in cleaned_metrics.items() if isinstance(v, float)}
58
+ rows[field] = cleaned_metrics
59
+ print(rows[field])
60
+
61
+ df = pd.DataFrame(rows).T
62
+ return df
63
+
64
+
65
+ def create_confusion_matrix(overall_metrics: dict):
66
+ page_type_metric = overall_metrics.get("page_type", {})
67
+ tp_count = page_type_metric.get("tp", 0)
68
+ tn_count = page_type_metric.get("tn", 0)
69
+ fp_count = page_type_metric.get("fp", 0)
70
+ fn_count = page_type_metric.get("fn", 0)
71
+
72
+ cm = pd.DataFrame(
73
+ [[tn_count, fp_count],
74
+ [fn_count, tp_count]],
75
+ index=['Expected NO_EVENT', 'Expected EVENT'],
76
+ columns=['Predicted NO_EVENT', 'Predicted EVENT']
77
+ )
78
+
79
+ fig, ax = plt.subplots()
80
+ sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=ax)
81
+ ax.set_xlabel('Predicted')
82
+ ax.set_ylabel('Expected')
83
+
84
+ return fig
85
+
86
+
87
+ def create_fn_df(record_results: dict):
88
+ false_negatives = [v.get("data", {}).get("reason") for v in record_results.values() if
89
+ v.get("record_metrics", {}).get("page_type", {}).get("fn") == 1]
90
+
91
+ fn_counts = pd.Series(false_negatives).value_counts()
92
+ fn_percent = fn_counts / len(record_results) * 100
93
+
94
+ df = pd.DataFrame({
95
+ 'Rejected Reason': fn_counts.index,
96
+ 'Anzahl': fn_counts.values,
97
+ 'Prozent': fn_percent.values
98
+ })
99
+
100
+ total_row = pd.DataFrame([{
101
+ 'Rejected Reason': 'Gesamt',
102
+ 'Anzahl': fn_counts.sum(),
103
+ 'Prozent': fn_percent.sum()
104
+ }])
105
+ df = pd.concat([df, total_row], ignore_index=True)
106
+ df = df.style.format({'Prozent': '{:.1f}%'})
107
+ return df
108
+
109
+
110
+ def create_error_df(overall_metrics: dict, batchsize: int):
111
+ rows = []
112
+ sum = 0
113
+ for k, v in overall_metrics.get("error", {}).items():
114
+ rows.append({
115
+ 'Error': k.upper(),
116
+ 'Anzahl': v,
117
+ 'Prozent': v / batchsize * 100
118
+ })
119
+ sum += v
120
+ rows.append({
121
+ 'Error': "Gesamt",
122
+ 'Anzahl': sum,
123
+ 'Prozent': sum / batchsize * 100
124
+ })
125
+ sum += v
126
+ df = pd.DataFrame(rows).style.format({'Prozent': '{:.1f}%'})
127
+ return df
128
+
129
+
130
+ def create_detail_table(test: dict):
131
+ def _stringify(v):
132
+ if isinstance(v, list):
133
+ return ", ".join(map(str, v))
134
+ if isinstance(v, dict):
135
+ return str(v)
136
+ return str(v)
137
+
138
+ rows = []
139
+ meta_columns = set()
140
+
141
+ for record_id, result in test.get("record_results", {}).items():
142
+ validation = db.testdata_1.find_one({"_id": ObjectId(record_id)})
143
+ expected = validation.get("data", {})
144
+ predicted = result.get("data", {})
145
+ metrics = result.get("record_metrics", {})
146
+ meta = result.get("meta", {})
147
+
148
+ # Meta columns sammeln
149
+ meta_columns |= {f"Meta - {k}" for k in meta}
150
+
151
+ # Kopfzeile pro Datensatz
152
+ head = {
153
+ "Record ID": str(record_id),
154
+ "Field": "",
155
+ "Expected": "",
156
+ "Predicted": "",
157
+ "Metrics": "",
158
+ **{f"Meta - {k}": _stringify(meta.get(k, "")) for k in meta}
159
+ }
160
+ rows.append(head)
161
+
162
+ def add(field, exp, pred):
163
+ val = metrics.get(field)
164
+ if isinstance(val, dict):
165
+ val = " | ".join(f"{k}: {v:.2f}" for k, v in val.items())
166
+ rows.append({
167
+ "Record ID": "",
168
+ "Field": field,
169
+ "Expected": exp,
170
+ "Predicted": pred,
171
+ "Metrics": val if val is not None else "",
172
+ **{col: "" for col in meta_columns}
173
+ })
174
+
175
+ add("page_type", validation.get("page_type"), result.get("page_type"))
176
+ add("title", expected.get("title"), predicted.get("title"))
177
+ add(
178
+ "schedule",
179
+ "\n\n".join(print_schedule_obj(s) for s in expected.get("schedule", [])),
180
+ "\n\n".join(print_schedule_obj(s) for s in predicted.get("schedule", []))
181
+ )
182
+ add(
183
+ "locations",
184
+ ", ".join(g.get("geolocation", {}).get("formatted", "") for g in expected.get("locations", [])),
185
+ ", ".join(g.get("geolocation", {}).get("formatted", "") for g in predicted.get("locations", []))
186
+ )
187
+
188
+ rows.append({col: "" for col in ["Record ID", "Field", "Expected", "Predicted", "Metrics", *meta_columns]})
189
+
190
+ return pd.DataFrame(rows)
191
+
192
+
193
+ @st.dialog("Original Seite", width="medium")
194
+ def show_website(url, html):
195
+ st.info(f"Link zur Original Website: {url}")
196
+ md = convert(html)
197
+ st.write(md)
198
+
199
+
200
+ st.title("Test Evaluation")
201
+ db = init_connection()
202
+ tests = list(db.test_evaluation.find({}, {"_id": 1, "status": 1, "created_at": 1}))
203
+ tests_sorted = sorted(tests, key=lambda t: t["created_at"], reverse=True)
204
+
205
+ options = {str(t["_id"]): f"{t['status']} - {t['created_at'].strftime('%Y-%m-%d %H:%M:%S')}" for t in tests_sorted}
206
+
207
+ selected_id = st.selectbox("Wähle einen Test aus", options=list(options.keys()), format_func=lambda x: options[x])
208
+
209
+ if selected_id:
210
+ test = db.test_evaluation.find_one({"_id": ObjectId(selected_id)})
211
+ record_results = test.get("record_results", {})
212
+ batchsize = len(record_results)
213
+
214
+ st.write(
215
+ f"**Test ID:** {selected_id} | **Status:** {test.get("status")} | **Batchsize:** {batchsize}")
216
+
217
+ overall_metrics = test.get("overall_metrics", {})
218
+ if overall_metrics:
219
+ df_data_metrics = create_data_metrics_df(overall_metrics)
220
+ cm_fig = create_confusion_matrix(overall_metrics)
221
+ df_fn = create_fn_df(record_results)
222
+ df_error = create_error_df(overall_metrics, batchsize)
223
+
224
+ st.write("# Overall Metrics")
225
+
226
+ st.write("### Data Metrics")
227
+ st.bar_chart(df_data_metrics, width=400, stack=False, sort=False)
228
+
229
+ col1, col2 = st.columns(2)
230
+ with col1:
231
+ st.write("### Confusion Matrix für Page Classification (page_type)")
232
+ st.pyplot(cm_fig, width=450)
233
+
234
+ with col2:
235
+ st.write("### Falsch abgelehnte Seiten (false negatives fn)")
236
+ st.dataframe(df_fn)
237
+
238
+ st.write("### Fehler in der Pipeline (error)")
239
+ st.dataframe(df_error)
240
+
241
+
242
+ else:
243
+ st.info("Der Test läuft noch. Es konnte noch keine Metric erstellt werden")
244
+
245
+ st.write(f"# Testergebnisse im Detail")
246
+ df = create_detail_table(test)
247
+ st.dataframe(df, height=600)
248
+
249
+ record_id = st.text_input(label="Gebe eine Record ID ein um die Original Website anzusehen.", value="")
250
+ if record_id:
251
+ record = db.testdata_1.find_one({"_id": ObjectId(record_id)})
252
+ html = record.get("html")
253
+ url = record.get("url")
254
+ if html:
255
+ html = html.decode("utf-8")
256
+ show_website(url, html)
src/pages/Testing.py ADDED
@@ -0,0 +1,71 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import base64
2
+ import json
3
+
4
+ import streamlit as st
5
+ from dotenv import load_dotenv
6
+
7
+ from project import Config
8
+ from services.db_service import init_connection
9
+ from services.mq_service import create_out_adapter
10
+ from services.test_creator import TestCreator
11
+
12
+ load_dotenv()
13
+ config = Config()
14
+ config.display()
15
+ out_adapter = create_out_adapter(config, config.queue_dest_test_trigger)
16
+
17
+ db = init_connection()
18
+ test_creator = TestCreator(db)
19
+
20
+ st.title("Testing")
21
+ st.write(f"Es sind {db.testdata_1.count_documents({})} Testdatensätze vorhanden.")
22
+
23
+ with st.form("Testkonfiguration"):
24
+ batch_id = st.selectbox("Testbatch wählen", [0, 1, 2, 3, "Alle"])
25
+ start_test = st.form_submit_button("Starte Test")
26
+
27
+ if start_test:
28
+ if batch_id == "Alle":
29
+ batch = list(db.testdata_1.find({}))
30
+ else:
31
+ batch = list(db.testdata_1.find({"batch_id": batch_id}))
32
+ record_ids = [str(data["_id"]) for data in batch]
33
+
34
+ test_id = test_creator.create(record_ids)
35
+
36
+
37
+ if test_id:
38
+ st.info(f"Starte Test mit {len(batch)} Testdaten, Test-ID: {str(test_id)}")
39
+ st.write(f"**Testdaten aus dem Batch:**")
40
+
41
+ for testdata in batch:
42
+ id = str(testdata["_id"])
43
+ url = testdata.get("url")
44
+ html = testdata.get("html", None)
45
+ if html:
46
+ html = html.decode("utf-8")
47
+ else: html = ""
48
+ data = testdata.get("data")
49
+ st.write(f"Data ID: {id}")
50
+ st.write(f"URL: {url}")
51
+ st.write("___")
52
+
53
+ try:
54
+ message = {
55
+ "url": url,
56
+ "body": base64.b64encode(html.encode("utf-8")).decode("utf-8"),
57
+ "meta": [{
58
+ "service_name": "demo-service",
59
+ "queue_dest": "storehtml-out",
60
+ "test_id": str(test_id),
61
+ "record_id": id,
62
+ }]
63
+ }
64
+ encoded = json.dumps(message).encode("utf-8")
65
+ out_adapter.send_message(encoded)
66
+ print(f"Sent {url}")
67
+
68
+ except Exception as e:
69
+ st.error(f"Fehler beim senden in die Message Queue: {e}")
70
+ else:
71
+ st.error("Fehler beim anlegen des Tests")
src/project/__init__.py CHANGED
@@ -1,2 +1 @@
1
  from .config import Config
2
- from .demo_thread import DemoThread
 
1
  from .config import Config
 
src/project/adapters/__init__.py CHANGED
@@ -1,4 +1,2 @@
1
- from .in_adapter import InAdapter
2
- from .message_queue_in_adapter_impl import MessageQueueInAdapterImpl
3
  from .message_queue_out_adapter_impl import MessageQueueOutAdapterImpl
4
  from .out_adapter import OutAdapter
 
 
 
1
  from .message_queue_out_adapter_impl import MessageQueueOutAdapterImpl
2
  from .out_adapter import OutAdapter
src/project/adapters/in_adapter.py DELETED
@@ -1,12 +0,0 @@
1
- from abc import abstractmethod
2
- from typing import Callable
3
-
4
- from pika import spec
5
- from pika.adapters.blocking_connection import BlockingChannel
6
-
7
-
8
- class InAdapter:
9
-
10
- @abstractmethod
11
- def start_consuming(self, callback: Callable[[BlockingChannel, spec.Basic.Deliver, spec.BasicProperties, bytes], None]) -> None:
12
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
src/project/adapters/message_queue_in_adapter_impl.py DELETED
@@ -1,19 +0,0 @@
1
- from typing import Callable
2
-
3
- from pika import spec
4
- from pika.adapters.blocking_connection import BlockingChannel
5
-
6
- from project import Config
7
- from project.adapters.in_adapter import InAdapter
8
- from wrapper import MessageQueue
9
-
10
-
11
- class MessageQueueInAdapterImpl(InAdapter):
12
-
13
- def __init__(self, config: Config, queue_name: str) -> None:
14
- self.__config = config
15
- # initialize queue
16
- self.__queue = MessageQueue(config.mq_host, config.mq_user, config.mq_password, queue_name)
17
-
18
- def start_consuming(self, callback: Callable[[BlockingChannel, spec.Basic.Deliver, spec.BasicProperties, bytes], None]) -> None:
19
- self.__queue.start_consuming(callback)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/project/config.py CHANGED
@@ -21,7 +21,7 @@ class Config:
21
  self.mq_password = os.environ.get('MQ_PASSWORD')
22
 
23
  # different modes:
24
- self.queue_mode = os.environ.get('QUEUE_MODE', MODE_CONSUME).lower()
25
  print(self.queue_mode)
26
  # consume: take elements from a queue (alias pull, alias store)
27
  if self.queue_mode in [MODE_CONSUME, 'pull', 'store', ]:
@@ -33,8 +33,8 @@ class Config:
33
  if self.queue_mode in [MODE_CONSUME_AND_PUBLISH, 'pull-push', 'store-n-forward', ]:
34
  self.queue_mode = MODE_CONSUME_AND_PUBLISH
35
 
36
- self.queue_src = os.environ.get('QUEUE_SRC')
37
- self.queue_dest = os.environ.get('QUEUE_DEST')
38
  self.data_directory = os.environ.get('DATA_DIRECTORY', '/storage/data/')
39
  # if mode is consume-and-publish the data_index file will be used for consume and will have a suffix ".published"
40
  self.data_index = os.environ.get('DATA_INDEX', '/storage/index')
@@ -43,9 +43,9 @@ class Config:
43
  self.__assert_variable_is_defined(self.mq_user, "User not specified in MQ_USER")
44
  self.__assert_variable_is_defined(self.mq_password, "Password not specified in MQ_PASSWORD")
45
  if self.is_start_publish():
46
- self.__assert_variable_is_defined(self.queue_dest, "Destination queue is not specified in QUEUE_DEST")
47
- if self.is_start_consume():
48
- self.__assert_variable_is_defined(self.queue_src, "Source queue is not specified in QUEUE_SRC")
49
 
50
  def display(self) -> None:
51
  for name in sorted(dir(self)):
@@ -58,8 +58,6 @@ class Config:
58
  print(f"{name.upper()}={getattr(self, name)}")
59
  print()
60
 
61
- def is_start_consume(self) -> bool:
62
- return self.queue_mode in [MODE_CONSUME, MODE_CONSUME_AND_PUBLISH, ]
63
 
64
  def is_start_publish(self) -> bool:
65
  return self.queue_mode in [MODE_PUBLISH, MODE_CONSUME_AND_PUBLISH, ]
 
21
  self.mq_password = os.environ.get('MQ_PASSWORD')
22
 
23
  # different modes:
24
+ self.queue_mode = os.environ.get('QUEUE_MODE', MODE_PUBLISH).lower()
25
  print(self.queue_mode)
26
  # consume: take elements from a queue (alias pull, alias store)
27
  if self.queue_mode in [MODE_CONSUME, 'pull', 'store', ]:
 
33
  if self.queue_mode in [MODE_CONSUME_AND_PUBLISH, 'pull-push', 'store-n-forward', ]:
34
  self.queue_mode = MODE_CONSUME_AND_PUBLISH
35
 
36
+ self.queue_dest_test_trigger= os.environ.get('QUEUE_DEST_TEST_TRIGGER')
37
+ self.queue_dest_pipeline_trigger = os.environ.get('QUEUE_DEST_PIPELINE_TRIGGER')
38
  self.data_directory = os.environ.get('DATA_DIRECTORY', '/storage/data/')
39
  # if mode is consume-and-publish the data_index file will be used for consume and will have a suffix ".published"
40
  self.data_index = os.environ.get('DATA_INDEX', '/storage/index')
 
43
  self.__assert_variable_is_defined(self.mq_user, "User not specified in MQ_USER")
44
  self.__assert_variable_is_defined(self.mq_password, "Password not specified in MQ_PASSWORD")
45
  if self.is_start_publish():
46
+ self.__assert_variable_is_defined(self.queue_dest_test_trigger, "Destination queue is not specified in QUEUE_DEST_TEST_TRIGGER")
47
+ self.__assert_variable_is_defined(self.queue_dest_pipeline_trigger, "Destination queue is not specified in QUEUE_DEST_PIPELINE_TRIGGER")
48
+
49
 
50
  def display(self) -> None:
51
  for name in sorted(dir(self)):
 
58
  print(f"{name.upper()}={getattr(self, name)}")
59
  print()
60
 
 
 
61
 
62
  def is_start_publish(self) -> bool:
63
  return self.queue_mode in [MODE_PUBLISH, MODE_CONSUME_AND_PUBLISH, ]
src/project/demo_thread.py DELETED
@@ -1,48 +0,0 @@
1
- import ast
2
- from typing import Callable, Union
3
-
4
- from project.adapters import InAdapter, OutAdapter
5
- from wrapper import HealthCheck
6
- from . import Config
7
-
8
-
9
- class DemoThread:
10
- def __init__(
11
- self,
12
- config: Config,
13
- health_check: Union[HealthCheck, None],
14
- in_adapter_injector: Callable[[Config], InAdapter],
15
- out_adapter_injector: Callable[[Config], OutAdapter]
16
- ) -> None:
17
- """
18
- Create BoilerplateRemovalThread instance.
19
- :param config: configuration review_options
20
- :param health_check: health check
21
- :param in_adapter_injector: input adapter, based on config
22
- :param out_adapter_injector: output adapter, based on config
23
- """
24
- self.__config = config
25
- self.__health_check = health_check
26
- # initialize queues
27
- self.__queue_src = in_adapter_injector(config)
28
- self.__queue_dest = out_adapter_injector(config)
29
-
30
- def handle_element(self, ch, method, properties, url: bytes) -> None:
31
- print(f"CONSUME: Received element {properties}: {len(url)}")
32
-
33
- try:
34
-
35
- self.__queue_dest.send_message(bytes(str(url), encoding="utf-8"))
36
- print(f"CONSUME: Send element {properties}")
37
-
38
- except Exception as e:
39
- print("Error", e)
40
-
41
- def run(self) -> None:
42
- # consume elements
43
- # TODO: are these callbacks called only one at a time or in multiple threads?
44
- # self.__handle_element("ch", "method", "properties", bytes("localhost/health", "utf-8"))
45
- # self.__handle_element("ch", "method", "properties", bytes("localhost/health", "utf-8"))
46
- self.__queue_src.start_consuming(self.handle_element)
47
- # print("CONSUME iteration")
48
- # time.sleep(1)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/services/__init__.py ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ from .db_service import init_connection
2
+ from .mq_service import create_out_adapter,PrintingOutAdapter
src/services/db_service.py ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+
3
+ import streamlit as st
4
+ from dotenv import load_dotenv
5
+ from pymongo import MongoClient
6
+
7
+
8
+ @st.cache_resource
9
+ def init_connection():
10
+ load_dotenv()
11
+ uri = f"mongodb+srv://{os.getenv('MONGO_USERNAME')}:{os.getenv('MONGO_PASSWORD')}@{os.getenv('MONGO_HOST')}/?retryWrites=true&w=majority&appName=Cluster0"
12
+ client = MongoClient(uri)
13
+ return client.event_data
src/services/mq_service.py ADDED
@@ -0,0 +1,17 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import streamlit as st
2
+
3
+ from project import Config
4
+ from project.adapters import MessageQueueOutAdapterImpl, OutAdapter
5
+
6
+
7
+ class PrintingOutAdapter(OutAdapter):
8
+ def send_message(self, message):
9
+ print(f"Publishing message {len(message)}")
10
+
11
+
12
+ @st.cache_resource
13
+ def create_out_adapter(_config: Config, queue_dest: str) -> OutAdapter:
14
+ if _config.mq_user == "local":
15
+ return PrintingOutAdapter()
16
+ else:
17
+ return MessageQueueOutAdapterImpl(_config, queue_dest)
src/services/test_creator.py ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import datetime
2
+
3
+ from bson import ObjectId, DatetimeMS
4
+
5
+
6
+ class TestCreator:
7
+ def __init__(self, db):
8
+ self.db = db
9
+ self.test_obj = None
10
+
11
+ def create(self, record_ids: list[str]) -> ObjectId:
12
+ record_results = {}
13
+ for batch_id in record_ids:
14
+ record_results[batch_id] = {"status": "pending"}
15
+
16
+ test_obj = {
17
+ "status": "pending",
18
+ "created_at": datetime.datetime.now(datetime.UTC),
19
+ "pipeline_version": "1.0.0",
20
+ "record_results": record_results
21
+ }
22
+ insert_result = self.db.test_evaluation.insert_one(test_obj)
23
+ return insert_result.inserted_id
src/wrapper/__init__.py CHANGED
@@ -1,2 +1 @@
1
- from .health_check import HealthCheck
2
  from .message_queue import MessageQueue
 
 
1
  from .message_queue import MessageQueue
src/wrapper/health_check.py DELETED
@@ -1,73 +0,0 @@
1
- import logging
2
- from typing import Callable, Dict
3
-
4
- from flask import Flask
5
- from flask.typing import ResponseValue
6
-
7
-
8
- # adapted from https://stackoverflow.com/questions/62000942/flask-block-specific-endpoints-from-logging
9
- class AjaxFilter(logging.Filter):
10
- def filter(self, record):
11
- return "/health" not in record.getMessage()
12
-
13
-
14
- class HealthCheck:
15
- # adapted from https://thelinuxnotes.com/index.php/implementing-a-flask-health-check-and-kubernetes-liveness-probe-in-python-application/
16
- def __do_check(self) -> tuple[ResponseValue, int]:
17
- # https://docs.python.org/3.9/library/functions.html#all
18
- # https://stackoverflow.com/questions/12229064/mapping-over-values-in-a-python-dictionary
19
- # loop over checks
20
- # execute check method for each key
21
- results = {k: v() for k, v in self.__checks.items()}
22
- overall_result = all(results.values())
23
- json = {
24
- "state": "OK" if overall_result else "ERROR",
25
- "elements": results
26
- }
27
- status_code = 200 if overall_result else 500
28
- return json, status_code
29
-
30
- def __init__(self, app: Flask) -> None:
31
- self.__app = app
32
- self.__checks: Dict[str, Callable[[], bool]] = {}
33
- self.__log = logging.getLogger('werkzeug')
34
- self.disable_logging_filter = AjaxFilter()
35
-
36
- # register route dynamically
37
- # https://pytutorial.com/flask-appadd_url_rule-dynamic-url-registration-guide/
38
- # @app.route('/health')
39
- # @app.route('/health/')
40
- self.__app.add_url_rule(
41
- '/health', # URL rule with variable
42
- 'health_check', # Endpoint name
43
- view_func=self.__do_check, # View function
44
- methods=['GET'], # Allowed methods
45
- strict_slashes=False # URL trailing slash handling
46
- )
47
-
48
- def disable_logging(self) -> None:
49
- print("health check logging disabled")
50
- self.__log.addFilter(self.disable_logging_filter)
51
-
52
- def enable_logging(self) -> None:
53
- print("health check logging enabled")
54
- self.__log.removeFilter(self.disable_logging_filter)
55
-
56
- def add_check(self, name, method: Callable[[], bool]) -> None:
57
- """method to add named check methods to health check
58
- Parameters
59
- ----------
60
- name : str
61
- named key of check method
62
- method : function
63
- method which returns a bool for the health check
64
- """
65
- self.__checks.update({f"{name}": method})
66
-
67
- def wrap_exception_handler(self, name: str, method: Callable[[], None]) -> None:
68
- """wrap exception handler around method and register simple health check for it"""
69
- self.add_check(name, lambda: True)
70
- try:
71
- method()
72
- finally:
73
- self.add_check(name, lambda: False)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/wrapper/message_queue.py CHANGED
@@ -42,6 +42,7 @@ class MessageQueue:
42
  properties=pika.BasicProperties(
43
  delivery_mode=pika.DeliveryMode.Persistent
44
  ))
 
45
 
46
  # try repeating once, if connection was lost
47
  retry = True
@@ -53,6 +54,7 @@ class MessageQueue:
53
  print("Reconnecting...")
54
  if retry:
55
  self.connect()
 
56
  retry = False
57
  else:
58
  raise e
@@ -60,21 +62,13 @@ class MessageQueue:
60
  # connection.close()
61
 
62
  def __ack_and_call_callback(
63
- self,
64
- ch: BlockingChannel,
65
- method: spec.Basic.Deliver,
66
- properties: spec.BasicProperties,
67
- body: bytes) -> None:
68
  # acknowledge processing directly
69
  ch.basic_ack(delivery_tag=method.delivery_tag)
70
  # print(f" [x] Received {body.decode()}")
71
  self.__callback(ch, method, properties, body)
72
  # print(" [x] Done")
73
-
74
- def start_consuming(self, callback: Callable[[BlockingChannel, spec.Basic.Deliver, spec.BasicProperties, bytes], None]) -> None:
75
- """callback method parameter is a method with the following signature:
76
- def callback(ch, method, properties, body):"""
77
- self.__callback = callback
78
- self.__channel.basic_qos(prefetch_count=1)
79
- self.__channel.basic_consume(queue=self.__queue_name, on_message_callback=self.__ack_and_call_callback)
80
- self.__channel.start_consuming()
 
42
  properties=pika.BasicProperties(
43
  delivery_mode=pika.DeliveryMode.Persistent
44
  ))
45
+ print(f"[PUBLISH] Queued message in {self.__queue_name}")
46
 
47
  # try repeating once, if connection was lost
48
  retry = True
 
54
  print("Reconnecting...")
55
  if retry:
56
  self.connect()
57
+ __send()
58
  retry = False
59
  else:
60
  raise e
 
62
  # connection.close()
63
 
64
  def __ack_and_call_callback(
65
+ self,
66
+ ch: BlockingChannel,
67
+ method: spec.Basic.Deliver,
68
+ properties: spec.BasicProperties,
69
+ body: bytes) -> None:
70
  # acknowledge processing directly
71
  ch.basic_ack(delivery_tag=method.delivery_tag)
72
  # print(f" [x] Received {body.decode()}")
73
  self.__callback(ch, method, properties, body)
74
  # print(" [x] Done")