light-mixing / scripts /rgb_led_sensor.py
sgbaird's picture
chore: Update RGB control panel with Streamlit and MQTT implementation
758a4c9
import json
import queue
import threading
import paho.mqtt.client as mqtt
import streamlit as st
# Initialize Streamlit app
st.title("RGB Command and Sensor Data Panel")
# MQTT Configuration
HIVEMQ_HOST = st.text_input(
"Enter your HiveMQ host:",
"248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud",
type="password",
)
HIVEMQ_USERNAME = st.text_input("Enter your HiveMQ username:", "sgbaird")
HIVEMQ_PASSWORD = st.text_input(
"Enter your HiveMQ password:", "D.Pq5gYtejYbU#L", type="password"
)
PORT = st.number_input(
"Enter the port number:", min_value=1, max_value=65535, value=8883
)
# User input for the Pico ID
pico_id = st.text_input("Enter your Pico ID:", "test", type="password")
max_power = 0.3
max_value = round(max_power * 255)
# Information about the maximum power reduction
st.info(
f"The maximum power has been reduced. The upper limit for RGB values is {max_value} instead of 255."
)
# Sliders for RGB values
R = st.slider("Select the Red value:", min_value=0, max_value=max_value, value=0)
G = st.slider("Select the Green value:", min_value=0, max_value=max_value, value=0)
B = st.slider("Select the Blue value:", min_value=0, max_value=max_value, value=0)
# Initialize session state for messages and lock state
if "messages" not in st.session_state:
st.session_state.messages = []
if "locked" not in st.session_state:
st.session_state.locked = False
# Queue to hold sensor data
sensor_data_queue = queue.Queue()
# Singleton: https://docs.streamlit.io/develop/api-reference/caching-and-state/st.cache_resource
@st.cache_resource
def get_paho_client(
sensor_data_topic, hostname, username, password=None, port=8883, tls=True
):
client = mqtt.Client(protocol=mqtt.MQTTv5) # create new instance
def on_message(client, userdata, msg):
sensor_data_queue.put(json.loads(msg.payload))
def on_connect(client, userdata, flags, rc, properties=None):
if rc != 0:
print("Connected with result code " + str(rc))
client.subscribe(sensor_data_topic, qos=1)
client.on_connect = on_connect
client.on_message = on_message
if tls:
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT)
client.username_pw_set(username, password)
client.connect(hostname, port)
client.loop_start() # Use a non-blocking loop
return client
def send_and_receive(client, command_topic, msg, queue_timeout=60):
client.publish(command_topic, json.dumps(msg), qos=2)
while True:
sensor_data = sensor_data_queue.get(True, queue_timeout)
return sensor_data
# Publish button
if st.button("Send RGB Command") and not st.session_state.locked:
if not pico_id or not HIVEMQ_HOST or not HIVEMQ_USERNAME or not HIVEMQ_PASSWORD:
st.error("Please enter all required fields.")
else:
st.session_state.locked = True
command_topic = f"{pico_id}/neopixel"
sensor_data_topic = f"{pico_id}/as7341"
client = get_paho_client(
sensor_data_topic,
HIVEMQ_HOST,
HIVEMQ_USERNAME,
password=HIVEMQ_PASSWORD,
port=int(PORT),
tls=True,
)
command_msg = {"R": R, "G": G, "B": B}
sensor_data = send_and_receive(
client, command_topic, command_msg, queue_timeout=30
)
st.session_state.locked = False
st.success("Command sent successfully!")
st.write("Sensor Data Received:", sensor_data)
# Display received messages
for message in st.session_state.messages:
st.write(f"Received message: {message}")