| import base64 | |
| import json | |
| import streamlit as st | |
| from dotenv import load_dotenv | |
| from project import Config | |
| from services.db_service import init_connection | |
| from services.mq_service import create_out_adapter | |
| from services.test_creator import TestCreator | |
| load_dotenv() | |
| config = Config() | |
| config.display() | |
| out_adapter = create_out_adapter(config, config.queue_dest_test_trigger) | |
| db = init_connection() | |
| test_creator = TestCreator(db) | |
| st.title("Testing") | |
| st.write(f"Es sind {db.testdata_1.count_documents({})} Testdatensätze vorhanden.") | |
| with st.form("Testkonfiguration"): | |
| batch_id = st.selectbox("Testbatch wählen", [0, 1, 2, 3, "Alle"]) | |
| start_test = st.form_submit_button("Starte Test") | |
| if start_test: | |
| if batch_id == "Alle": | |
| batch = list(db.testdata_1.find({})) | |
| else: | |
| batch = list(db.testdata_1.find({"batch_id": batch_id})) | |
| record_ids = [str(data["_id"]) for data in batch] | |
| test_id = test_creator.create(record_ids) | |
| if test_id: | |
| st.info(f"Starte Test mit {len(batch)} Testdaten, Test-ID: {str(test_id)}") | |
| st.write(f"**Testdaten aus dem Batch:**") | |
| for testdata in batch: | |
| id = str(testdata["_id"]) | |
| url = testdata.get("url") | |
| html = testdata.get("html", None) | |
| if html: | |
| html = html.decode("utf-8") | |
| else: html = "" | |
| data = testdata.get("data") | |
| st.write(f"Data ID: {id}") | |
| st.write(f"URL: {url}") | |
| st.write("___") | |
| try: | |
| message = { | |
| "url": url, | |
| "body": base64.b64encode(html.encode("utf-8")).decode("utf-8"), | |
| "meta": [{ | |
| "service_name": "demo-service", | |
| "queue_dest": "storehtml-out", | |
| "test_id": str(test_id), | |
| "record_id": id, | |
| }] | |
| } | |
| encoded = json.dumps(message).encode("utf-8") | |
| out_adapter.send_message(encoded) | |
| print(f"Sent {url}") | |
| except Exception as e: | |
| st.error(f"Fehler beim senden in die Message Queue: {e}") | |
| else: | |
| st.error("Fehler beim anlegen des Tests") | |