Spaces:
Sleeping
Sleeping
| import json | |
| import queue | |
| import threading | |
| import paho.mqtt.client as mqtt | |
| import streamlit as st | |
| # Initialize Streamlit app | |
| st.title("RGB Command and Sensor Data Panel") | |
| # MQTT Configuration | |
| HIVEMQ_HOST = st.text_input( | |
| "Enter your HiveMQ host:", | |
| "248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud", | |
| type="password", | |
| ) | |
| HIVEMQ_USERNAME = st.text_input("Enter your HiveMQ username:", "sgbaird") | |
| HIVEMQ_PASSWORD = st.text_input( | |
| "Enter your HiveMQ password:", "D.Pq5gYtejYbU#L", type="password" | |
| ) | |
| PORT = st.number_input( | |
| "Enter the port number:", min_value=1, max_value=65535, value=8883 | |
| ) | |
| # User input for the Pico ID | |
| pico_id = st.text_input("Enter your Pico ID:", "test", type="password") | |
| max_power = 0.3 | |
| max_value = round(max_power * 255) | |
| # Information about the maximum power reduction | |
| st.info( | |
| f"The maximum power has been reduced. The upper limit for RGB values is {max_value} instead of 255." | |
| ) | |
| # Sliders for RGB values | |
| R = st.slider("Select the Red value:", min_value=0, max_value=max_value, value=0) | |
| G = st.slider("Select the Green value:", min_value=0, max_value=max_value, value=0) | |
| B = st.slider("Select the Blue value:", min_value=0, max_value=max_value, value=0) | |
| # Initialize session state for messages and lock state | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| if "locked" not in st.session_state: | |
| st.session_state.locked = False | |
| # Queue to hold sensor data | |
| sensor_data_queue = queue.Queue() | |
| # Singleton: https://docs.streamlit.io/develop/api-reference/caching-and-state/st.cache_resource | |
| def get_paho_client( | |
| sensor_data_topic, hostname, username, password=None, port=8883, tls=True | |
| ): | |
| client = mqtt.Client(protocol=mqtt.MQTTv5) # create new instance | |
| def on_message(client, userdata, msg): | |
| sensor_data_queue.put(json.loads(msg.payload)) | |
| def on_connect(client, userdata, flags, rc, properties=None): | |
| if rc != 0: | |
| print("Connected with result code " + str(rc)) | |
| client.subscribe(sensor_data_topic, qos=1) | |
| client.on_connect = on_connect | |
| client.on_message = on_message | |
| if tls: | |
| client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT) | |
| client.username_pw_set(username, password) | |
| client.connect(hostname, port) | |
| client.loop_start() # Use a non-blocking loop | |
| return client | |
| def send_and_receive(client, command_topic, msg, queue_timeout=60): | |
| client.publish(command_topic, json.dumps(msg), qos=2) | |
| while True: | |
| sensor_data = sensor_data_queue.get(True, queue_timeout) | |
| return sensor_data | |
| # Publish button | |
| if st.button("Send RGB Command") and not st.session_state.locked: | |
| if not pico_id or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD: | |
| st.error("Please enter all required fields.") | |
| else: | |
| st.session_state.locked = True | |
| command_topic = f"{pico_id}/neopixel" | |
| sensor_data_topic = f"{pico_id}/as7341" | |
| client = get_paho_client( | |
| sensor_data_topic, | |
| HIVEMQ_HOST, | |
| HIVEMQ_USERNAME, | |
| password=HIVEMQ_PASSWORD, | |
| port=int(PORT), | |
| tls=True, | |
| ) | |
| command_msg = {"R": R, "G": G, "B": B} | |
| sensor_data = send_and_receive( | |
| client, command_topic, command_msg, queue_timeout=30 | |
| ) | |
| st.session_state.locked = False | |
| st.success("Command sent successfully!") | |
| st.write("Sensor Data Received:", sensor_data) | |
| # Display received messages | |
| for message in st.session_state.messages: | |
| st.write(f"Received message: {message}") | |